From 9274a05323191b16affa7cb11adb7b16029d694c Mon Sep 17 00:00:00 2001 From: Roy Peter D'Souza Date: Wed, 29 Apr 2026 20:27:12 -0700 Subject: [PATCH 1/2] Local audit remediations: swap monitoring and MoE memory budgeting --- README.md | 14 - Sources/DFlash/DFlashIntermediateDumper.swift | 1 + Sources/DFlash/DFlashRuntime.swift | 2 +- .../MLXInferenceCore/GenerationConfig.swift | 30 +- .../MLXInferenceCore/InferenceEngine.swift | 186 +--- .../ModelDownloadManager.swift | 133 +-- .../MLXInferenceCore/ModelDownloader.swift | 442 +------- Sources/MLXInferenceCore/ModelStorage.swift | 421 +------- Sources/SwiftLM/ModelProfiler.swift | 54 +- Sources/SwiftLM/Server.swift | 71 +- SwiftBuddy/SwiftBuddy/SwiftBuddyApp.swift | 10 +- .../SwiftBuddy/ViewModels/ServerManager.swift | 178 +--- SwiftBuddy/SwiftBuddy/Views/ChatView.swift | 81 +- .../SwiftBuddy/Views/InspectorView.swift | 17 +- .../Views/ModelManagementView.swift | 25 +- .../SwiftBuddy/Views/ModelPickerView.swift | 2 +- SwiftBuddy/SwiftBuddy/Views/ModelsView.swift | 115 +- SwiftBuddy/SwiftBuddy/Views/RootView.swift | 51 +- .../SwiftBuddy/Views/SettingsView.swift | 998 +++++------------- SwiftBuddy/generate_xcodeproj.py | 28 +- scripts/profiling/profile_runner.py | 14 +- 21 files changed, 509 insertions(+), 2364 deletions(-) diff --git a/README.md b/README.md index 6b9d5eb1..3a8d3778 100644 --- a/README.md +++ b/README.md @@ -73,20 +73,6 @@ Benchmark results for `gemma-4-26b-a4b-it-4bit` (26B MoE, 4-bit) on M5 Pro 64 GB > Run `./run_benchmark.sh` to generate these metrics on your own device. (See **Benchmarks & Testing** below). -### Qwen3.6-35B-A3B-UD-MLX-4bit (Full-RAM) — M1 Ultra 64 GB - -Benchmark results for full-RAM (no SSD streaming) MoE inference on M1 Ultra. The 3.4× vanilla improvement vs. earlier builds comes from the `needsMoeFlush` gate in `mlx-swift-lm` (see [SwiftLM #84](https://github.com/SharpAI/SwiftLM/issues/84)) — the per-layer GPU sync barrier required for SSD streaming was firing unconditionally on the full-RAM path and flushing MLX's kernel-batching pipeline. - -| Configuration | Short (~126 tok) | Medium (~400 tok) | Long (~800 tok) | -|---|---|---|---| -| **Vanilla full-GPU** | **61.7 tok/s** | **62.3 tok/s** | **62.1 tok/s** | - -> *Hardware:* Apple M1 Ultra, 64 GB unified memory, macOS 26.x. Model ~20 GB on disk, ~21.6 GB resident weight + ~2.1 GB KV at runtime. -> *Flags:* `--repeat-penalty 1.1 --max-tokens 2000`, `temperature: 0.6`, single-stream `/v1/chat/completions`. -> *Vanilla baseline before* `needsMoeFlush` *gate (for reference):* 19.2 / 18.1 / 18.3 tok/s — see #84. - -> ⚠️ **DFlash on this model is currently unsuitable for production.** DFlash uses pure greedy (`argMax`) decoding regardless of `temperature`, which on Qwen3.6-35B-A3B + the [`z-lab/Qwen3.6-35B-A3B-DFlash`](https://huggingface.co/z-lab/Qwen3.6-35B-A3B-DFlash) draft locks into low-entropy attractors (`"and and and..."`, `"**UMA** **UMA**..."`). Earlier 70 tok/s DFlash numbers were degenerate output that scored high acceptance because draft and target both committed to the same locked-in token. Repetition-penalty mitigation works on some prompts but tanks acceptance on others — the proper fix is stochastic posterior sampling with rejection-based accept ([Leviathan/Chen](https://arxiv.org/abs/2211.17192) formulation), which is a DFlash architecture change tracked at [z-lab/dflash#91](https://github.com/z-lab/dflash/issues/91). - ### DeepSeek-V4-Flash (126 GB, Q3-mixed-gs128-affine) — M5 Pro 64 GB Model: [`Thump604/DeepSeek-V4-Flash-MLX-Q3-mixed-gs128-affine`](https://huggingface.co/Thump604/DeepSeek-V4-Flash-MLX-Q3-mixed-gs128-affine) diff --git a/Sources/DFlash/DFlashIntermediateDumper.swift b/Sources/DFlash/DFlashIntermediateDumper.swift index 1401cb9b..a9802aff 100644 --- a/Sources/DFlash/DFlashIntermediateDumper.swift +++ b/Sources/DFlash/DFlashIntermediateDumper.swift @@ -42,6 +42,7 @@ public enum DFlashDumper { eval(floatArr) let shape = (0.. 8192 tokens to ~3.5 bits/token. - public var turboKV: Bool - - /// Enable SSD expert streaming for MoE models. - public var streamExperts: Bool - - /// Chunk size for prefill evaluation. - /// Lower values prevent GPU timeout on large models. - public var prefillSize: Int - - /// KV-cache quantization bits (nil = no quantization, 4 or 8 typical). - public var kvBits: Int? - - /// KV-cache quantization group size (default 64). - public var kvGroupSize: Int - public init( maxTokens: Int = 2048, temperature: Float = 0.6, @@ -38,12 +20,7 @@ public struct GenerationConfig: Sendable { minP: Float = 0.0, repetitionPenalty: Float = 1.05, seed: UInt64? = nil, - enableThinking: Bool = false, - turboKV: Bool = false, - streamExperts: Bool = false, - prefillSize: Int = 512, - kvBits: Int? = nil, - kvGroupSize: Int = 64 + enableThinking: Bool = false ) { self.maxTokens = maxTokens self.temperature = temperature @@ -53,11 +30,6 @@ public struct GenerationConfig: Sendable { self.repetitionPenalty = repetitionPenalty self.seed = seed self.enableThinking = enableThinking - self.turboKV = turboKV - self.streamExperts = streamExperts - self.prefillSize = prefillSize - self.kvBits = kvBits - self.kvGroupSize = kvGroupSize } public static let `default` = GenerationConfig() diff --git a/Sources/MLXInferenceCore/InferenceEngine.swift b/Sources/MLXInferenceCore/InferenceEngine.swift index 38d5b396..ed1c34ae 100644 --- a/Sources/MLXInferenceCore/InferenceEngine.swift +++ b/Sources/MLXInferenceCore/InferenceEngine.swift @@ -114,10 +114,6 @@ public final class InferenceEngine: ObservableObject { @Published public private(set) var activeContextTokens: Int = 0 @Published public private(set) var maxContextWindow: Int = 0 - /// Set when a corrupted/truncated model is detected during inference. - /// The UI should observe this and offer to delete & re-download. - @Published public var corruptedModelId: String? = nil - /// Whether to automatically unload the model when the app backgrounds /// and reload it when returning to foreground. /// Defaults to true on iOS (prevents jetsam), false on macOS. @@ -281,44 +277,7 @@ public final class InferenceEngine: ObservableObject { state = .error("Device is too hot. Let it cool before loading a model.") return } - corruptedModelId = nil - - guard ModelStorage.verifyModelIntegrity(for: modelId) else { - await downloadThenLoad(modelId: modelId) - return - } - - await loadVerifiedModel(modelId: modelId) - } - - private func downloadThenLoad(modelId: String) async { - print("[InferenceEngine] Model \(modelId) is missing or incomplete. Starting download before load.") - releaseLoadedModelResources() - state = .downloading(progress: 0.0, speed: "Preparing...") - - let task = downloadManager.startDownload(modelId: modelId) - - do { - try await task.value - state = .downloading(progress: 1.0, speed: "Verifying...") - - guard ModelStorage.verifyModelIntegrity(for: modelId) else { - markModelCorrupted( - modelId: modelId, - message: "Model files are incomplete after download. Choose a recovery option." - ) - return - } - - await loadVerifiedModel(modelId: modelId) - } catch is CancellationError { - state = .idle - } catch { - state = .error("Failed to download \(modelId): \(error.localizedDescription)") - } - } - private func loadVerifiedModel(modelId: String) async { state = .loading currentModelId = modelId @@ -353,21 +312,17 @@ public final class InferenceEngine: ObservableObject { downloader: downloader ) - let speedTracker = DownloadSpeedTracker() - if architecture.supportsVision { container = try await VLMModelFactory.shared.loadContainer( from: downloader, using: TransformersTokenizerLoader(), configuration: config ) { [weak self] progress in - speedTracker.record(totalBytes: progress.completedUnitCount) - let smoothedSpeed = speedTracker.speedBytesPerSec - Task { @MainActor in guard let self else { return } let pct = progress.fractionCompleted - let speedStr = smoothedSpeed + let speedBytesPerSec = progress.userInfo[ProgressUserInfoKey("throughputKey")] as? Double + let speedStr = speedBytesPerSec .map { String(format: "%.1f MB/s", $0 / 1_000_000) } ?? "" self.state = .downloading(progress: pct, speed: speedStr) @@ -375,7 +330,7 @@ public final class InferenceEngine: ObservableObject { modelId: modelId, fractionCompleted: pct, currentFile: "", - speedMBps: smoothedSpeed.map { $0 / 1_000_000 } + speedMBps: speedBytesPerSec.map { $0 / 1_000_000 } )) } } @@ -385,13 +340,11 @@ public final class InferenceEngine: ObservableObject { using: TransformersTokenizerLoader(), configuration: config ) { [weak self] progress in - speedTracker.record(totalBytes: progress.completedUnitCount) - let smoothedSpeed = speedTracker.speedBytesPerSec - Task { @MainActor in guard let self else { return } let pct = progress.fractionCompleted - let speedStr = smoothedSpeed + let speedBytesPerSec = progress.userInfo[ProgressUserInfoKey("throughputKey")] as? Double + let speedStr = speedBytesPerSec .map { String(format: "%.1f MB/s", $0 / 1_000_000) } ?? "" self.state = .downloading(progress: pct, speed: speedStr) @@ -399,7 +352,7 @@ public final class InferenceEngine: ObservableObject { modelId: modelId, fractionCompleted: pct, currentFile: "", - speedMBps: smoothedSpeed.map { $0 / 1_000_000 } + speedMBps: speedBytesPerSec.map { $0 / 1_000_000 } )) } } @@ -408,85 +361,26 @@ public final class InferenceEngine: ObservableObject { downloadManager.clearProgress(modelId: modelId) downloadManager.lastLoadedModelId = modelId downloadManager.refresh() - - // Verify integrity to catch incomplete downloads before marking as ready - guard ModelStorage.verifyModelIntegrity(for: modelId) else { - throw NSError(domain: "InferenceEngine", code: 1, userInfo: [NSLocalizedDescriptionKey: "Model safetensors files are incomplete. Please delete and re-download."]) - } - - // Read the model's actual max context length from config.json - if let ctxLen = ModelStorage.readMaxContextLength(for: modelId) { - self.maxContextWindow = ctxLen - print("[InferenceEngine] Model context window: \(ctxLen) tokens") - } else { - self.maxContextWindow = 8192 // conservative fallback for models without explicit limits - print("[InferenceEngine] No explicit context limit found in config.json, defaulting to 8192") - } - state = .ready(modelId: modelId) } catch { ExpertStreamingConfig.shared.deactivate() downloadManager.clearProgress(modelId: modelId) state = .error("Failed to load \(modelId): \(error.localizedDescription)") - - // If the model is incomplete/corrupted, flag it so the UI shows the "Delete & Re-download" button - let nsError = error as NSError - if nsError.domain == "InferenceEngine" && nsError.code == 1 || Self.isModelCorruptionError(error) { - markModelCorrupted( - modelId: modelId, - message: "Model weights are corrupted or incomplete. Choose a recovery option." - ) - return - } - container = nil - self.maxContextWindow = 0 - self.activeContextTokens = 0 } } /// Unload the current model and free all GPU memory. public func unload() { - releaseLoadedModelResources() - corruptedModelId = nil - state = .idle - } - - private func releaseLoadedModelResources() { generationTask?.cancel() - generationTask = nil container = nil currentModelId = nil - maxContextWindow = 0 - activeContextTokens = 0 + state = .idle ExpertStreamingConfig.shared.deactivate() MLX.Memory.cacheLimit = 0 } - private func markModelCorrupted(modelId: String?, message: String) { - let failedModelId = modelId ?? currentModelId - releaseLoadedModelResources() - state = .error(message) - corruptedModelId = failedModelId - } - - private static func isModelCorruptionError(_ error: Error) -> Bool { - let description = error.localizedDescription.lowercased() - return description.contains("ssd streaming") - || description.contains("pread") - || description.contains("safetensors") - || description.contains("corrupt") - || description.contains("incomplete") - } - - public func clearCorruptionRecovery() { - corruptedModelId = nil - if case .error = state { - state = .idle - } - } - // MARK: — Generation public nonisolated func generate( @@ -528,17 +422,11 @@ public final class InferenceEngine: ObservableObject { } let mlxMessages = finalMessages - var params = GenerateParameters( - maxTokens: config.maxTokens, - kvBits: config.kvBits, - kvGroupSize: config.kvGroupSize, - temperature: config.temperature, - topP: config.topP, - topK: config.topK, - minP: config.minP, - repetitionPenalty: config.repetitionPenalty, - prefillStepSize: config.prefillSize - ) + var params = GenerateParameters(temperature: config.temperature) + params.topP = config.topP + params.topK = config.topK + params.minP = config.minP + params.repetitionPenalty = config.repetitionPenalty params.repetitionContextSize = 20 var thinkingActive = false @@ -554,7 +442,9 @@ public final class InferenceEngine: ObservableObject { let baseTokens = Int(Double(stringLength) / 3.5) self.activeContextTokens = baseTokens - // maxContextWindow is already set during loadModel() from config.json + // If we have a max length config, expose it + // TODO: Safely extract from ModelConfiguration when MLX exposes it dynamically + self.maxContextWindow = 8192 let stream: AsyncStream = try await container.generate( input: lmInput, @@ -595,30 +485,11 @@ public final class InferenceEngine: ObservableObject { continuation.yield(GenerationToken(text: text, isThinking: thinkingActive)) } } - } catch let ssdError as SSDStreamingError { - // Corrupted/truncated safetensors — surface a clear, actionable error - let msg = "Model weights are corrupted or incomplete. Please re-download the model." - print("[InferenceEngine] SSD Streaming Error: \(ssdError.localizedDescription)") - continuation.yield(GenerationToken(text: "\n\n[Error: \(msg)]")) - self.markModelCorrupted(modelId: self.currentModelId, message: msg) } catch { - // Check if the generic error is also an SSD streaming issue - if Self.isModelCorruptionError(error) { - let msg = "Model weights are corrupted or incomplete. Please re-download the model." - self.markModelCorrupted(modelId: self.currentModelId, message: msg) - } continuation.yield(GenerationToken(text: "\n\n[Error: \(error.localizedDescription)]")) } - if let latchedError = SSDStreamingErrorLatch.shared.consume() { - let msg = "Model weights are corrupted or incomplete. Please re-download the model." - print("[InferenceEngine] Latched SSD error after generation: \(latchedError.localizedDescription)") - self.markModelCorrupted(modelId: self.currentModelId, message: msg) - } else if case .error = self.state { - // Already in error state from catch block above - } else { - self.state = self.currentModelId.map { .ready(modelId: $0) } ?? .idle - } + self.state = self.currentModelId.map { .ready(modelId: $0) } ?? .idle continuation.finish() } } @@ -629,29 +500,4 @@ public final class InferenceEngine: ObservableObject { generationTask = nil if let id = currentModelId { state = .ready(modelId: id) } } - - /// Delete corrupted model files and start a fresh download. - /// Called from the UI when the user confirms re-download after corruption is detected. - public func deleteCorruptedAndRedownload() { - guard let modelId = corruptedModelId else { return } - - releaseLoadedModelResources() - state = .downloading(progress: 0.0, speed: "Deleting corrupted files...") - - do { - try ModelStorage.delete(modelId) - print("[InferenceEngine] Successfully deleted corrupted cache directory for \(modelId).") - } catch { - print("[InferenceEngine] FAILED to delete corrupted cache: \(error.localizedDescription)") - state = .error("Failed to delete corrupted model: \(error.localizedDescription)") - return - } - downloadManager.refresh() - corruptedModelId = nil - - print("[InferenceEngine] Deleted corrupted files for \(modelId), starting fresh download") - Task { @MainActor in - await downloadThenLoad(modelId: modelId) - } - } } diff --git a/Sources/MLXInferenceCore/ModelDownloadManager.swift b/Sources/MLXInferenceCore/ModelDownloadManager.swift index 309bf4c6..5a0c8b07 100644 --- a/Sources/MLXInferenceCore/ModelDownloadManager.swift +++ b/Sources/MLXInferenceCore/ModelDownloadManager.swift @@ -52,13 +52,10 @@ public final class ModelDownloadManager: ObservableObject { // MARK: Published state @Published public private(set) var downloadedModels: [DownloadedModel] = [] - @Published public private(set) var incompleteDownloads: [ModelStorage.IncompleteDownload] = [] @Published public private(set) var activeDownloads: [String: ModelDownloadProgress] = [:] @Published public private(set) var totalDiskUsageBytes: Int64 = 0 @Published public private(set) var networkStatus: NetworkStatus = .unknown - private var downloadedModelIDs: Set = [] - // MARK: Persistence private let lastModelKey = "swiftlm.lastLoadedModelId" public var lastLoadedModelId: String? { @@ -133,86 +130,41 @@ public final class ModelDownloadManager: ObservableObject { modifiedDate: s.modifiedDate ) } - downloadedModelIDs = Set(downloadedModels.map(\.id)) totalDiskUsageBytes = downloadedModels.reduce(0) { $0 + $1.sizeBytes } - - // Scan for interrupted downloads that can be resumed - incompleteDownloads = ModelStorage.scanIncompleteDownloads() - .filter { incomplete in - // Exclude models that are already actively downloading - !activeDownloads.keys.contains(incomplete.id) - } } public func isDownloaded(_ modelId: String) -> Bool { - downloadedModelIDs.contains(modelId) - } - - /// True if a model has a partial download that can be resumed. - public func hasIncompleteDownload(_ modelId: String) -> Bool { - incompleteDownloads.contains { $0.id == modelId } + ModelStorage.isDownloaded(modelId) } public func downloadedModel(for modelId: String) -> DownloadedModel? { downloadedModels.first(where: { $0.id == modelId }) } - /// Delete a model and free disk space (including any partial downloads). + /// Delete a model and free disk space. public func delete(_ modelId: String) throws { try ModelStorage.delete(modelId) refresh() if lastLoadedModelId == modelId { lastLoadedModelId = nil } } - /// Resume an incomplete download. This calls startDownload() which will - /// automatically resume from where it left off (partial files + HTTP Range). - @discardableResult - public func resumeDownload(modelId: String) -> Task { - return startDownload(modelId: modelId) - } - // MARK: — Download - /// Start downloading a model. - /// iOS: Uses ModelDownloader with per-file resume + retry. - /// macOS: Uses HubApi.snapshot() which handles resume internally; we add retry around it. + /// Start downloading a model (iOS only — macOS goes through LLMModelFactory in InferenceEngine.load()). @discardableResult - public func startDownload( - modelId: String, - retryConfig: DownloadRetryConfig = .default - ) -> Task { - print("[ModelDownloadManager] startDownload called for \(modelId)") + public func startDownload(modelId: String) -> Task { downloadTasks[modelId]?.cancel() let task = Task { - print("[ModelDownloadManager] Task started for \(modelId)") - // Instantly register 0% progress so UI banners appear immediately - // before the Hub API computes the file snapshot. - Task { @MainActor [weak self] in - if self?.activeDownloads[modelId] == nil { - print("[ModelDownloadManager] Registering 0% progress for \(modelId)") - self?.activeDownloads[modelId] = ModelDownloadProgress( - modelId: modelId, - fractionCompleted: 0.0, - currentFile: "Preparing download...", - speedMBps: nil - ) - } - } - do { defer { - print("[ModelDownloadManager] Defer executing, removing activeDownload for \(modelId)") Task { @MainActor [weak self] in self?.activeDownloads.removeValue(forKey: modelId) } } #if !os(macOS) - try await ModelDownloader.shared.download( - modelId: modelId, - retryConfig: retryConfig - ) { [weak self] fp in + try await ModelDownloader.shared.download(modelId: modelId) { [weak self] fp in Task { @MainActor [weak self] in self?.activeDownloads[modelId] = ModelDownloadProgress( modelId: modelId, @@ -223,68 +175,23 @@ public final class ModelDownloadManager: ObservableObject { } } #else - // macOS: HubApi.snapshot() already supports resume via incomplete blob - // files and HTTP Range headers. We add retry for transient failures. - let speedTracker = DownloadSpeedTracker() - var lastError: Error? - for attempt in 0...retryConfig.maxRetries { - do { - if attempt > 0 { - let delay = retryConfig.delay(for: attempt - 1) - print("[ModelDownloadManager] Retry \(attempt)/\(retryConfig.maxRetries) for \(modelId) after \(String(format: "%.1f", delay))s") - try await Task.sleep(nanoseconds: UInt64(delay * 1_000_000_000)) - try Task.checkCancellation() - speedTracker.reset() + let hub = HubApi(downloadBase: ModelStorage.cacheRoot) + _ = try await hub.snapshot( + from: modelId, + matching: ["*.safetensors", "*.json", "*.model", "*.txt", "*.tiktoken"], + progressHandler: { @Sendable [weak self] progress in + Task { @MainActor [weak self] in + let pct = progress.fractionCompleted + let speedBytesPerSec = progress.userInfo[ProgressUserInfoKey("throughputKey")] as? Double + self?.activeDownloads[modelId] = ModelDownloadProgress( + modelId: modelId, + fractionCompleted: pct, + currentFile: "", + speedMBps: speedBytesPerSec.map { $0 / 1_000_000 } + ) } - - let hub = HubApi(downloadBase: ModelStorage.cacheRoot) - print("[ModelDownloadManager] Calling hub.snapshot for \(modelId)") - _ = try await hub.snapshot( - from: modelId, - matching: ["*.safetensors", "*.json", "*.model", "*.txt", "*.tiktoken"], - progressHandler: { @Sendable [weak self] progress in - // Feed cumulative bytes into the EWMA tracker - speedTracker.record(totalBytes: progress.completedUnitCount) - let smoothedSpeed = speedTracker.speedBytesPerSec - - Task { @MainActor [weak self] in - let pct = progress.fractionCompleted - self?.activeDownloads[modelId] = ModelDownloadProgress( - modelId: modelId, - fractionCompleted: pct, - currentFile: attempt > 0 ? "(retry \(attempt))" : "", - speedMBps: smoothedSpeed.map { $0 / 1_000_000 } - ) - } - } - ) - print("[ModelDownloadManager] hub.snapshot FINISHED SUCCESSFULLY for \(modelId)") - lastError = nil - break // Success - } catch is CancellationError { - print("[ModelDownloadManager] Task was CANCELLED for \(modelId)") - throw CancellationError() - } catch { - lastError = error - print("[ModelDownloadManager] Download failed for \(modelId): \(error.localizedDescription)") - // Only retry transient network errors - if let urlError = error as? URLError { - switch urlError.code { - case .cancelled, .userCancelledAuthentication: - throw error - case .notConnectedToInternet, .networkConnectionLost, - .timedOut, .cannotConnectToHost, .dnsLookupFailed: - continue - default: - if attempt >= retryConfig.maxRetries { throw error } - continue - } - } - // Non-URLError (e.g. auth failure) — don't retry - throw error } - } - if let error = lastError { throw error } + ) #endif Task { @MainActor [weak self] in diff --git a/Sources/MLXInferenceCore/ModelDownloader.swift b/Sources/MLXInferenceCore/ModelDownloader.swift index f8652b9f..bb6ca2c9 100644 --- a/Sources/MLXInferenceCore/ModelDownloader.swift +++ b/Sources/MLXInferenceCore/ModelDownloader.swift @@ -4,12 +4,8 @@ // (called directly from InferenceEngine.load — no separate downloader needed) // // iOS: Uses HuggingFace API to enumerate model files, then downloads -// each file via URLSession to ModelStorage.cacheRoot +// each file via URLSession background session to ModelStorage.cacheRoot // so LLMModelFactory can find them on next load without re-downloading. -// -// Both paths support: -// • Resume after restart (partial files preserved + HTTP Range) -// • Automatic retry with exponential backoff import Foundation import Hub @@ -25,7 +21,6 @@ public struct DownloadFileProgress: Sendable { public let fileFractionCompleted: Double public let totalBytesDownloaded: Int64 public let speedBytesPerSec: Double? - public let retryAttempt: Int public var overallFraction: Double { let fileDone = Double(max(fileIndex - 1, 0)) / Double(max(fileCount, 1)) @@ -39,141 +34,6 @@ public struct DownloadFileProgress: Sendable { ? String(format: "%.1f MB/s", s / 1_000_000) : String(format: "%.0f KB/s", s / 1_000) } - - public init( - modelId: String, - fileName: String, - fileIndex: Int, - fileCount: Int, - fileFractionCompleted: Double, - totalBytesDownloaded: Int64, - speedBytesPerSec: Double?, - retryAttempt: Int = 0 - ) { - self.modelId = modelId - self.fileName = fileName - self.fileIndex = fileIndex - self.fileCount = fileCount - self.fileFractionCompleted = fileFractionCompleted - self.totalBytesDownloaded = totalBytesDownloaded - self.speedBytesPerSec = speedBytesPerSec - self.retryAttempt = retryAttempt - } -} - -// MARK: — Retry Configuration - -public struct DownloadRetryConfig: Sendable { - /// Maximum number of retry attempts per file (0 = no retry) - public let maxRetries: Int - /// Initial delay before the first retry (doubles each attempt) - public let initialDelaySeconds: Double - /// Maximum delay cap to prevent extremely long waits - public let maxDelaySeconds: Double - - public static let `default` = DownloadRetryConfig( - maxRetries: 3, - initialDelaySeconds: 2.0, - maxDelaySeconds: 30.0 - ) - - /// Calculate delay for a given attempt (exponential backoff with jitter) - func delay(for attempt: Int) -> TimeInterval { - let base = initialDelaySeconds * pow(2.0, Double(attempt)) - let capped = min(base, maxDelaySeconds) - // Add ±25% jitter to prevent thundering herd - let jitter = capped * Double.random(in: -0.25...0.25) - return max(0.5, capped + jitter) - } -} - -// MARK: — Speed Tracker - -/// Tracks download speed using an exponentially weighted moving average (EWMA) -/// over a sliding window. Produces stable, human-readable speed values instead -/// of volatile per-chunk calculations. -public final class DownloadSpeedTracker: @unchecked Sendable { - private let lock = NSLock() - - /// Samples: (timestamp, cumulativeBytes) - private var samples: [(time: TimeInterval, bytes: Int64)] = [] - /// How far back (seconds) to look for the rolling average - private let windowSeconds: TimeInterval - /// EWMA smoothing factor (0.0–1.0). Higher = more responsive, lower = smoother. - private let alpha: Double - /// Current EWMA speed in bytes/sec - private var ewmaSpeed: Double = 0 - /// Absolute start time for elapsed calculation - private let startTime: TimeInterval - /// Total bytes at start (for resumed downloads) - private let startBytes: Int64 - - public init(windowSeconds: TimeInterval = 5.0, alpha: Double = 0.3, resumeOffset: Int64 = 0) { - self.windowSeconds = windowSeconds - self.alpha = alpha - self.startTime = ProcessInfo.processInfo.systemUptime - self.startBytes = resumeOffset - } - - /// Record a cumulative byte count at the current time. - public func record(totalBytes: Int64) { - lock.lock() - defer { lock.unlock() } - - let now = ProcessInfo.processInfo.systemUptime - samples.append((time: now, bytes: totalBytes)) - - // Prune samples older than the window - let cutoff = now - windowSeconds - samples.removeAll { $0.time < cutoff } - - // Calculate instantaneous speed from oldest sample in window - if samples.count >= 2, let oldest = samples.first { - let dt = now - oldest.time - if dt > 0.1 { - let instantSpeed = Double(totalBytes - oldest.bytes) / dt - // EWMA blend - if ewmaSpeed == 0 { - ewmaSpeed = instantSpeed - } else { - ewmaSpeed = alpha * instantSpeed + (1 - alpha) * ewmaSpeed - } - } - } - } - - /// Current smoothed speed in bytes/sec. Returns nil if no meaningful data yet. - public var speedBytesPerSec: Double? { - lock.lock() - defer { lock.unlock() } - return ewmaSpeed > 0 ? ewmaSpeed : nil - } - - /// Overall average speed since tracking began (bytes/sec). - public func overallAverageSpeed(currentBytes: Int64) -> Double? { - let elapsed = ProcessInfo.processInfo.systemUptime - startTime - guard elapsed > 0.5 else { return nil } - let downloaded = currentBytes - startBytes - return downloaded > 0 ? Double(downloaded) / elapsed : nil - } - - /// Reset for a new file while keeping the tracker alive. - public func reset(resumeOffset: Int64 = 0) { - lock.lock() - defer { lock.unlock() } - samples.removeAll() - ewmaSpeed = 0 - } -} - -private struct DownloadedFileSizeMismatchError: LocalizedError { - let fileName: String - let expectedSize: Int64 - let actualSize: Int64 - - var errorDescription: String? { - "Downloaded file \(fileName) is incomplete (expected \(expectedSize) bytes, got \(actualSize))." - } } // MARK: — Downloader actor @@ -183,16 +43,18 @@ public actor ModelDownloader { public static let shared = ModelDownloader() private init() {} - // MARK: — iOS: URLSession download with resume + retry + // MARK: — iOS: URLSession background download #if !os(macOS) - private lazy var session: URLSession = { - let config = URLSessionConfiguration.default + private lazy var backgroundSession: URLSession = { + let config = URLSessionConfiguration.background( + withIdentifier: "com.sharpai.swiftlmchat.modeldownload" + ) + config.isDiscretionary = false + config.sessionSendsLaunchEvents = true config.allowsConstrainedNetworkAccess = true config.allowsExpensiveNetworkAccess = true - config.timeoutIntervalForRequest = 60 - config.timeoutIntervalForResource = 3600 // 1 hour for large files return URLSession(configuration: config) }() @@ -201,12 +63,11 @@ public actor ModelDownloader { let siblings: [HFFile] struct HFFile: Decodable { let rfilename: String - let size: Int64? } } /// Fetch the file list for a model from the HuggingFace REST API. - private func fetchFileList(modelId: String) async throws -> [(name: String, size: Int64?)] { + private func fetchFileList(modelId: String) async throws -> [String] { let url = URL(string: "https://huggingface.co/api/models/\(modelId)")! let (data, response) = try await URLSession.shared.data(from: url) guard let http = response as? HTTPURLResponse, http.statusCode == 200 else { @@ -214,211 +75,37 @@ public actor ModelDownloader { } let info = try JSONDecoder().decode(HFModelInfo.self, from: data) return info.siblings + .map { $0.rfilename } .filter { name in - !name.rfilename.hasSuffix(".bin") // Skip PyTorch weights - && !name.rfilename.hasSuffix(".ot") - && !name.rfilename.contains(".gguf") + !name.hasSuffix(".bin") // Skip PyTorch weights + && !name.hasSuffix(".ot") + && !name.contains(".gguf") } - .map { ($0.rfilename, $0.size) } } - /// Download a single file from HuggingFace to `targetDir` with resume support. - /// - /// Uses a `.incomplete` suffix for in-progress downloads. If a partial file - /// exists from a previous attempt, sends an HTTP Range header to resume. - private func downloadFile( - modelId: String, - fileName: String, - expectedSize: Int64?, - targetDir: URL, - speedTracker: DownloadSpeedTracker, - onProgress: @Sendable (Double, Double?) -> Void - ) async throws { + /// Download a single file from HuggingFace to `targetDir`. + private func downloadFile(modelId: String, fileName: String, targetDir: URL) async throws { let fileURL = URL(string: "https://huggingface.co/\(modelId)/resolve/main/\(fileName)")! let destURL = targetDir.appendingPathComponent(fileName) - let incompleteURL = destURL.appendingPathExtension("incomplete") // Create subdirectories if needed (e.g. for tokenizer/config subpaths) let parentDir = destURL.deletingLastPathComponent() try FileManager.default.createDirectory(at: parentDir, withIntermediateDirectories: true) - // Already downloaded — verify size if known, skip if good - if FileManager.default.fileExists(atPath: destURL.path) { - if let expected = expectedSize { - let actual = (try? FileManager.default.attributesOfItem(atPath: destURL.path)[.size] as? Int64) ?? 0 - if actual == expected { - onProgress(1.0, speedTracker.speedBytesPerSec) - return - } - // Size mismatch — remove and re-download - try? FileManager.default.removeItem(at: destURL) - } else { - onProgress(1.0, speedTracker.speedBytesPerSec) - return - } - } + if FileManager.default.fileExists(atPath: destURL.path) { return } - // Check for a partial download from a previous session - var resumeOffset: Int64 = 0 - if FileManager.default.fileExists(atPath: incompleteURL.path) { - resumeOffset = (try? FileManager.default.attributesOfItem(atPath: incompleteURL.path)[.size] as? Int64) ?? 0 - } - - var request = URLRequest(url: fileURL) - if resumeOffset > 0 { - request.setValue("bytes=\(resumeOffset)-", forHTTPHeaderField: "Range") - } - - // Stream download using bytes(for:) for progress tracking - let (asyncBytes, response) = try await session.bytes(for: request) - guard let http = response as? HTTPURLResponse else { - throw URLError(.badServerResponse) - } - - // Handle 416 Range Not Satisfiable — partial file is stale, restart - if http.statusCode == 416 { - try? FileManager.default.removeItem(at: incompleteURL) - resumeOffset = 0 - speedTracker.reset() - // Retry without Range header - let freshRequest = URLRequest(url: fileURL) - let (freshBytes, freshResponse) = try await session.bytes(for: freshRequest) - guard let freshHttp = freshResponse as? HTTPURLResponse, (200..<300).contains(freshHttp.statusCode) else { - throw URLError(.badServerResponse) - } - let totalSize = freshHttp.expectedContentLength > 0 ? freshHttp.expectedContentLength : (expectedSize ?? 0) - let writtenSize = try await streamToFile( - asyncBytes: freshBytes, - destURL: incompleteURL, - resumeOffset: 0, - totalSize: totalSize, - speedTracker: speedTracker, - onProgress: onProgress - ) - try validateCompletedDownloadSize( - fileName: fileName, - actualSize: writtenSize, - expectedSize: totalSize > 0 ? totalSize : expectedSize - ) - } else if (200..<300).contains(http.statusCode) { - // 200 = full content (server ignored Range), 206 = partial content (resume worked) - let isResume = (http.statusCode == 206) - if !isResume { - // Server returned full content — discard partial file - try? FileManager.default.removeItem(at: incompleteURL) - resumeOffset = 0 - speedTracker.reset() - } - let totalSize: Int64 - if isResume { - totalSize = resumeOffset + http.expectedContentLength - } else { - totalSize = http.expectedContentLength > 0 ? http.expectedContentLength : (expectedSize ?? 0) - } - let writtenSize = try await streamToFile( - asyncBytes: asyncBytes, - destURL: incompleteURL, - resumeOffset: isResume ? resumeOffset : 0, - totalSize: totalSize, - speedTracker: speedTracker, - onProgress: onProgress - ) - try validateCompletedDownloadSize( - fileName: fileName, - actualSize: writtenSize, - expectedSize: totalSize > 0 ? totalSize : expectedSize - ) - } else { + let (tmpURL, response) = try await backgroundSession.download(from: fileURL) + guard let http = response as? HTTPURLResponse, http.statusCode == 200 else { throw URLError(.badServerResponse) } - - // Atomic move from .incomplete to final destination try? FileManager.default.removeItem(at: destURL) - try FileManager.default.moveItem(at: incompleteURL, to: destURL) - onProgress(1.0, speedTracker.speedBytesPerSec) - } - - /// Stream async bytes to a file, appending if resuming. - private func streamToFile( - asyncBytes: URLSession.AsyncBytes, - destURL: URL, - resumeOffset: Int64, - totalSize: Int64, - speedTracker: DownloadSpeedTracker, - onProgress: @Sendable (Double, Double?) -> Void - ) async throws -> Int64 { - let fileHandle: FileHandle - if resumeOffset > 0, FileManager.default.fileExists(atPath: destURL.path) { - fileHandle = try FileHandle(forWritingTo: destURL) - fileHandle.seekToEndOfFile() - } else { - FileManager.default.createFile(atPath: destURL.path, contents: nil) - fileHandle = try FileHandle(forWritingTo: destURL) - } - defer { try? fileHandle.close() } - - let flushSize = 256 * 1024 // Flush every 256 KB - var lastProgressUpdate = Date() - var bytesWritten: Int64 = resumeOffset - var iterator = asyncBytes.makeAsyncIterator() - var chunkBuffer = [UInt8](repeating: 0, count: flushSize) - - while true { - try Task.checkCancellation() - - var chunkCount = 0 - while chunkCount < chunkBuffer.count, - let byte = try await iterator.next() { - chunkBuffer[chunkCount] = byte - chunkCount += 1 - } - - if chunkCount == 0 { - break - } - - fileHandle.write(Data(chunkBuffer[0..= 0.1 { - lastProgressUpdate = now - if totalSize > 0 { - onProgress(Double(bytesWritten) / Double(totalSize), speedTracker.speedBytesPerSec) - } - } - } - - if totalSize > 0 { - onProgress(Double(bytesWritten) / Double(totalSize), speedTracker.speedBytesPerSec) - } - return bytesWritten - } - - private func validateCompletedDownloadSize( - fileName: String, - actualSize: Int64, - expectedSize: Int64? - ) throws { - guard let expectedSize, expectedSize > 0, actualSize != expectedSize else { return } - throw DownloadedFileSizeMismatchError( - fileName: fileName, - expectedSize: expectedSize, - actualSize: actualSize - ) + try FileManager.default.moveItem(at: tmpURL, to: destURL) } /// Download all model files to `ModelStorage.cacheRoot` in the Hugging Face /// hub format expected by `LLMModelFactory.loadContainer()`. - /// - /// Supports: - /// - **Resume after restart**: partial `.incomplete` files are preserved and resumed via HTTP Range - /// - **Automatic retry**: transient network errors retry with exponential backoff public func download( modelId: String, - retryConfig: DownloadRetryConfig = .default, onProgress: @escaping @Sendable (DownloadFileProgress) -> Void ) async throws { let files = try await fetchFileList(modelId: modelId) @@ -435,98 +122,39 @@ public actor ModelDownloader { ) var totalDownloaded: Int64 = 0 - let speedTracker = DownloadSpeedTracker() - for (idx, file) in files.enumerated() { + for (idx, fileName) in files.enumerated() { try Task.checkCancellation() + let startTime = Date() let before = ModelStorage.directorySize(at: snapshotDir) - var lastError: Error? - - // Reset speed tracker per-file so the EWMA starts fresh - speedTracker.reset() - - // Retry loop with exponential backoff - for attempt in 0...retryConfig.maxRetries { - do { - if attempt > 0 { - let delay = retryConfig.delay(for: attempt - 1) - print("[ModelDownloader] Retry \(attempt)/\(retryConfig.maxRetries) for \(file.name) after \(String(format: "%.1f", delay))s") - try await Task.sleep(nanoseconds: UInt64(delay * 1_000_000_000)) - try Task.checkCancellation() - } - - onProgress(DownloadFileProgress( - modelId: modelId, - fileName: file.name, - fileIndex: idx + 1, - fileCount: files.count, - fileFractionCompleted: 0, - totalBytesDownloaded: totalDownloaded, - speedBytesPerSec: speedTracker.speedBytesPerSec, - retryAttempt: attempt - )) - - try await downloadFile( - modelId: modelId, - fileName: file.name, - expectedSize: file.size, - targetDir: snapshotDir, - speedTracker: speedTracker - ) { fraction, speed in - onProgress(DownloadFileProgress( - modelId: modelId, - fileName: file.name, - fileIndex: idx + 1, - fileCount: files.count, - fileFractionCompleted: fraction, - totalBytesDownloaded: totalDownloaded, - speedBytesPerSec: speed, - retryAttempt: attempt - )) - } - - lastError = nil - break // Success — exit retry loop - } catch is CancellationError { - throw CancellationError() - } catch { - lastError = error - print("[ModelDownloader] Download failed for \(file.name): \(error.localizedDescription)") - // Don't retry on non-transient errors - if let urlError = error as? URLError { - switch urlError.code { - case .cancelled, .userCancelledAuthentication: - throw error - case .notConnectedToInternet, .networkConnectionLost, - .timedOut, .cannotConnectToHost, .dnsLookupFailed: - continue // Transient — retry - default: - if attempt >= retryConfig.maxRetries { throw error } - continue - } - } - } - } + onProgress(DownloadFileProgress( + modelId: modelId, + fileName: fileName, + fileIndex: idx + 1, + fileCount: files.count, + fileFractionCompleted: 0, + totalBytesDownloaded: totalDownloaded, + speedBytesPerSec: nil + )) - if let error = lastError { - throw error - } + try await downloadFile(modelId: modelId, fileName: fileName, targetDir: snapshotDir) let after = ModelStorage.directorySize(at: snapshotDir) let downloaded = max(0, after - before) totalDownloaded += downloaded + let elapsed = max(Date().timeIntervalSince(startTime), 0.001) + let speed = Double(downloaded) / elapsed onProgress(DownloadFileProgress( modelId: modelId, - fileName: file.name, + fileName: fileName, fileIndex: idx + 1, fileCount: files.count, fileFractionCompleted: 1.0, totalBytesDownloaded: totalDownloaded, - speedBytesPerSec: speedTracker.speedBytesPerSec, - retryAttempt: 0 + speedBytesPerSec: speed )) } } diff --git a/Sources/MLXInferenceCore/ModelStorage.swift b/Sources/MLXInferenceCore/ModelStorage.swift index ddd1e507..758e36be 100644 --- a/Sources/MLXInferenceCore/ModelStorage.swift +++ b/Sources/MLXInferenceCore/ModelStorage.swift @@ -43,216 +43,24 @@ public enum ModelStorage { /// Local cache directory for a model, or nil if not downloaded. public static func cacheDirectory(for modelId: String) -> URL? { - materializedDirectory(for: modelId) ?? hubCacheDirectory(for: modelId) - } - - /// Swift Hub's materialized repository directory. - /// `HubApi(downloadBase: cacheRoot).snapshot(from:)` writes here. - public static func materializedDirectory(for modelId: String) -> URL? { - let dir = materializedDirectoryURL(for: modelId) - return directoryExists(dir) ? dir : nil - } - - private static func materializedDirectoryURL(for modelId: String) -> URL { - cacheRoot - .appendingPathComponent("models", isDirectory: true) - .appendingPathComponent(modelId, isDirectory: true) - } - - /// Hugging Face hub cache directory used by Python tools and older SwiftBuddy paths. - public static func hubCacheDirectory(for modelId: String) -> URL? { - let dir = hubCacheDirectoryURL(for: modelId) - return directoryExists(dir) ? dir : nil - } - - private static func hubCacheDirectoryURL(for modelId: String) -> URL { - cacheRoot.appendingPathComponent(hubDirName(for: modelId), isDirectory: true) - } - - private static func directoryExists(_ url: URL) -> Bool { - var isDirectory: ObjCBool = false - return FileManager.default.fileExists(atPath: url.path, isDirectory: &isDirectory) && isDirectory.boolValue + let dir = cacheRoot.appendingPathComponent(hubDirName(for: modelId)) + return FileManager.default.fileExists(atPath: dir.path) ? dir : nil } /// True if a model's cache directory exists and contains files. // The snapshot directory is where safetensors files live inside the HF hub layout: // /models--org--name/snapshots/main/ public static func snapshotDirectory(for modelId: String) -> URL { - return materializedDirectory(for: modelId) ?? resolvedSnapshotDirectory(for: modelId) ?? cacheRoot + return cacheRoot .appendingPathComponent(hubDirName(for: modelId)) .appendingPathComponent("snapshots/main") } - /// Resolve the active snapshot directory for a model in the Hugging Face hub cache. - /// Prefer refs/main because snapshot directories are usually commit hashes, not "main". - public static func resolvedSnapshotDirectory(for modelId: String) -> URL? { - guard let dir = hubCacheDirectory(for: modelId) else { return nil } - - let snapshotsDir = dir.appendingPathComponent("snapshots", isDirectory: true) - guard FileManager.default.fileExists(atPath: snapshotsDir.path) else { return nil } - - let refsMain = dir.appendingPathComponent("refs/main") - if let hashString = try? String(contentsOf: refsMain, encoding: .utf8) - .trimmingCharacters(in: .whitespacesAndNewlines), - !hashString.isEmpty { - let snapshot = snapshotsDir.appendingPathComponent(hashString, isDirectory: true) - if FileManager.default.fileExists(atPath: snapshot.path) { - return snapshot - } - } - - let mainSnapshot = snapshotsDir.appendingPathComponent("main", isDirectory: true) - if FileManager.default.fileExists(atPath: mainSnapshot.path) { - return mainSnapshot - } - - guard let contents = try? FileManager.default.contentsOfDirectory( - at: snapshotsDir, - includingPropertiesForKeys: [.isDirectoryKey], - options: [.skipsHiddenFiles] - ) else { return nil } - - let directories = contents.filter { url in - (try? url.resourceValues(forKeys: [.isDirectoryKey]).isDirectory) == true - } - return directories.count == 1 ? directories[0] : nil - } - public static func isDownloaded(_ modelId: String) -> Bool { - verifyModelIntegrity(for: modelId, logFailures: false) - } - - // MARK: — Model Config Inspection - - /// Read the model's maximum context length from its config.json. - /// Checks `text_config.max_position_embeddings` first (VLM/MoE models), - /// then falls back to top-level `max_position_embeddings`. - public static func readMaxContextLength(for modelId: String) -> Int? { - guard let config = readModelConfig(for: modelId) else { return nil } - - // VLM/MoE models nest the context length in text_config - if let textConfig = config["text_config"] as? [String: Any], - let maxPos = textConfig["max_position_embeddings"] as? Int { - return maxPos - } - - // Standard LLMs have it at top level - if let maxPos = config["max_position_embeddings"] as? Int { - return maxPos - } - - // Fallback: some models use n_ctx or max_seq_len - if let nCtx = config["n_ctx"] as? Int { return nCtx } - if let maxSeq = config["max_seq_len"] as? Int { return maxSeq } - - return nil - } - - /// Read the raw config.json dictionary for a downloaded model. - /// Verifies that all required safetensors files are present in the snapshot directory. - /// This prevents the engine from entering `.ready` state if a download was interrupted or corrupted. - public static func verifyModelIntegrity(for modelId: String) -> Bool { - verifyModelIntegrity(for: modelId, logFailures: true) - } - - private static func verifyModelIntegrity(for modelId: String, logFailures: Bool) -> Bool { - if hasIncompleteFiles(for: modelId) { - if logFailures { print("[ModelStorage] Integrity Check Failed: Incomplete download files remain for \(modelId)") } - return false - } - - for directory in modelContentDirectories(for: modelId) { - if validateModelFiles(in: directory, logFailures: logFailures) { - return true - } - } - - if logFailures { print("[ModelStorage] Integrity Check Failed: No valid model files found for \(modelId)") } - return false - } - - private static func modelContentDirectories(for modelId: String) -> [URL] { - var directories: [URL] = [] - if let materialized = materializedDirectory(for: modelId) { - directories.append(materialized) - } - if let snapshot = resolvedSnapshotDirectory(for: modelId), !directories.contains(snapshot) { - directories.append(snapshot) - } - return directories - } - - private static func validateModelFiles(in snapshotDir: URL, logFailures: Bool) -> Bool { - // 0. Verify core metadata files - let requiredJsonFiles = ["config.json", "tokenizer.json"] - for file in requiredJsonFiles { - let path = snapshotDir.appendingPathComponent(file) - if !FileManager.default.fileExists(atPath: path.path) { - // Some models might not have tokenizer.json if they use tokenizer.model, so we only strictly enforce config.json - if file == "config.json" { - if logFailures { print("[ModelStorage] Integrity Check Failed: Missing \(file) in \(snapshotDir.path)") } - return false - } - } else if fileSizeResolvingSymlink(path) == 0 { - if logFailures { print("[ModelStorage] Integrity Check Failed: \(file) is corrupted (0 bytes)") } - return false - } - } - - // 1. Try to read model.safetensors.index.json - let indexJsonPath = snapshotDir.appendingPathComponent("model.safetensors.index.json") - if FileManager.default.fileExists(atPath: indexJsonPath.path) { - guard let data = try? Data(contentsOf: indexJsonPath), - let json = try? JSONSerialization.jsonObject(with: data) as? [String: Any], - let weightMap = json["weight_map"] as? [String: String] else { - return false - } - // Collect all unique safetensors filenames - let requiredFiles = Set(weightMap.values) - var totalShardBytes: Int64 = 0 - for file in requiredFiles { - let filePath = snapshotDir.appendingPathComponent(file) - guard let size = fileSizeResolvingSymlink(filePath) else { - if logFailures { print("[ModelStorage] Integrity Check Failed: Missing \(file)") } - return false - } - guard size > 1024 else { - if logFailures { print("[ModelStorage] Integrity Check Failed: \(file) is too small (\(size) bytes)") } - return false - } - totalShardBytes += size - } - - if let metadata = json["metadata"] as? [String: Any], - let expectedTensorBytes = int64Value(metadata["total_size"]), - totalShardBytes < expectedTensorBytes { - if logFailures { - print("[ModelStorage] Integrity Check Failed: shard bytes \(totalShardBytes) below index total_size \(expectedTensorBytes)") - } - return false - } - return true - } - - // 2. If no index.json, it might be a single safetensors file model - let singleSafetensors = snapshotDir.appendingPathComponent("model.safetensors") - if let size = fileSizeResolvingSymlink(singleSafetensors), size > 1024 { - return true - } - - if logFailures { print("[ModelStorage] Integrity Check Failed: No safetensors found in \(snapshotDir.path)") } - return false - } - - public static func readModelConfig(for modelId: String) -> [String: Any]? { - for directory in modelContentDirectories(for: modelId) { - let configPath = directory.appendingPathComponent("config.json") - guard let data = try? Data(contentsOf: configPath), - let config = try? JSONSerialization.jsonObject(with: data) as? [String: Any] - else { continue } - return config - } - return nil + guard let dir = cacheDirectory(for: modelId) else { return false } + // Must have a snapshots subdirectory with content + let snapshots = dir.appendingPathComponent("snapshots") + return FileManager.default.fileExists(atPath: snapshots.path) } // MARK: — Disk Operations @@ -265,20 +73,14 @@ public enum ModelStorage { /// Bytes used by a specific model on disk. public static func sizeOnDisk(for modelId: String) -> Int64 { - associatedDirectories(for: modelId).reduce(Int64(0)) { $0 + directorySize(at: $1) } + guard let dir = cacheDirectory(for: modelId) else { return 0 } + return directorySize(at: dir) } /// Delete all cached files for a model. public static func delete(_ modelId: String) throws { - var firstError: Error? - for dir in associatedDirectories(for: modelId) { - do { - try FileManager.default.removeItem(at: dir) - } catch { - if firstError == nil { firstError = error } - } - } - if let firstError { throw firstError } + guard let dir = cacheDirectory(for: modelId) else { return } + try FileManager.default.removeItem(at: dir) } // MARK: — iCloud Exclusion (iOS) @@ -312,200 +114,31 @@ public enum ModelStorage { ) else { return [] } - var resultsById: [String: ScannedModel] = [:] + var results: [ScannedModel] = [] for dir in contents { - if dir.lastPathComponent.hasPrefix("models--") { - let modelId = dir.lastPathComponent - .replacingOccurrences(of: "^models--", with: "", options: .regularExpression) - .replacingOccurrences(of: "--", with: "/") - addScannedModelIfDownloaded(modelId: modelId, dir: dir, resultsById: &resultsById) - } else if dir.lastPathComponent == "models" { - guard let organizations = try? FileManager.default.contentsOfDirectory( - at: dir, - includingPropertiesForKeys: [.contentModificationDateKey], - options: [.skipsHiddenFiles] - ) else { continue } - - for organization in organizations where directoryExists(organization) { - guard let modelDirs = try? FileManager.default.contentsOfDirectory( - at: organization, - includingPropertiesForKeys: [.contentModificationDateKey], - options: [.skipsHiddenFiles] - ) else { continue } - - for modelDir in modelDirs where directoryExists(modelDir) { - let modelId = "\(organization.lastPathComponent)/\(modelDir.lastPathComponent)" - addScannedModelIfDownloaded(modelId: modelId, dir: modelDir, resultsById: &resultsById) - } - } - } - } - return resultsById.values.sorted { ($0.modifiedDate ?? .distantPast) > ($1.modifiedDate ?? .distantPast) } - } + guard dir.lastPathComponent.hasPrefix("models--") else { continue } - private static func addScannedModelIfDownloaded( - modelId: String, - dir: URL, - resultsById: inout [String: ScannedModel] - ) { - guard isDownloaded(modelId) else { return } + // Reverse the naming convention to get the model ID + let modelId = dir.lastPathComponent + .replacingOccurrences(of: "^models--", with: "", options: .regularExpression) + .replacingOccurrences(of: "--", with: "/") - let modified = (try? dir.resourceValues(forKeys: [.contentModificationDateKey]))?.contentModificationDate - let candidate = ScannedModel( - modelId: modelId, - cacheDirectory: cacheDirectory(for: modelId) ?? dir, - sizeBytes: sizeOnDisk(for: modelId), - modifiedDate: modified - ) + // Do NOT filter by ModelCatalog anymore => allow arbitrary downloaded Hugging Face models! + guard isDownloaded(modelId) else { continue } // skip partial downloads - if let existing = resultsById[modelId], - (existing.modifiedDate ?? .distantPast) >= (candidate.modifiedDate ?? .distantPast) { - return + let modified = (try? dir.resourceValues(forKeys: [.contentModificationDateKey]))?.contentModificationDate + results.append(ScannedModel( + modelId: modelId, + cacheDirectory: dir, + sizeBytes: directorySize(at: dir), + modifiedDate: modified + )) } - resultsById[modelId] = candidate - } - - // MARK: — Incomplete Downloads - - /// A model whose download was interrupted and can be resumed. - public struct IncompleteDownload: Identifiable, Sendable { - public let id: String // modelId - public let cacheDirectory: URL - /// Bytes downloaded so far (sum of complete + incomplete files) - public let downloadedBytes: Int64 - /// When the partial download was last modified - public let lastModified: Date? - } - - /// Check whether a model directory has any `.incomplete` partial files (iOS path) - /// or incomplete blobs (macOS HubApi path). - public static func hasIncompleteFiles(for modelId: String) -> Bool { - associatedDirectories(for: modelId).contains { countIncompleteFiles(in: $0) > 0 } - } - - /// Scan the cache root for model directories that have partial downloads - /// but are NOT fully downloaded (i.e. `isDownloaded()` returns false, or - /// the directory contains `.incomplete` files). - public static func scanIncompleteDownloads() -> [IncompleteDownload] { - guard FileManager.default.fileExists(atPath: cacheRoot.path), - let contents = try? FileManager.default.contentsOfDirectory( - at: cacheRoot, - includingPropertiesForKeys: [.contentModificationDateKey], - options: [.skipsHiddenFiles] - ) - else { return [] } - - var resultsById: [String: IncompleteDownload] = [:] - for dir in contents { - if dir.lastPathComponent.hasPrefix("models--") { - let modelId = dir.lastPathComponent - .replacingOccurrences(of: "^models--", with: "", options: .regularExpression) - .replacingOccurrences(of: "--", with: "/") - addIncompleteDownloadIfNeeded(modelId: modelId, dir: dir, resultsById: &resultsById) - } else if dir.lastPathComponent == "models" { - guard let organizations = try? FileManager.default.contentsOfDirectory( - at: dir, - includingPropertiesForKeys: [.contentModificationDateKey], - options: [.skipsHiddenFiles] - ) else { continue } - - for organization in organizations where directoryExists(organization) { - guard let modelDirs = try? FileManager.default.contentsOfDirectory( - at: organization, - includingPropertiesForKeys: [.contentModificationDateKey], - options: [.skipsHiddenFiles] - ) else { continue } - - for modelDir in modelDirs where directoryExists(modelDir) { - let modelId = "\(organization.lastPathComponent)/\(modelDir.lastPathComponent)" - addIncompleteDownloadIfNeeded(modelId: modelId, dir: modelDir, resultsById: &resultsById) - } - } - } - } - return resultsById.values.sorted { ($0.lastModified ?? .distantPast) > ($1.lastModified ?? .distantPast) } - } - - private static func addIncompleteDownloadIfNeeded( - modelId: String, - dir: URL, - resultsById: inout [String: IncompleteDownload] - ) { - // Skip fully completed models unless they have leftover .incomplete files. - if isDownloaded(modelId) && !hasIncompleteFiles(for: modelId) { - return - } - - // Must have SOME content (not just an empty directory). - let size = directorySize(at: dir) - guard size > 0 else { return } - - let modified = (try? dir.resourceValues(forKeys: [.contentModificationDateKey]))?.contentModificationDate - let candidate = IncompleteDownload( - id: modelId, - cacheDirectory: dir, - downloadedBytes: size, - lastModified: modified - ) - - if let existing = resultsById[modelId], - (existing.lastModified ?? .distantPast) >= (candidate.lastModified ?? .distantPast) { - return - } - resultsById[modelId] = candidate - } - - /// Count `.incomplete` files in a directory tree. - private static func countIncompleteFiles(in directory: URL) -> Int { - guard let enumerator = FileManager.default.enumerator( - at: directory, - includingPropertiesForKeys: nil, - options: [.skipsHiddenFiles] - ) else { return 0 } - - var count = 0 - for case let fileURL as URL in enumerator { - if fileURL.pathExtension == "incomplete" { - count += 1 - } - } - return count + return results.sorted { ($0.modifiedDate ?? .distantPast) > ($1.modifiedDate ?? .distantPast) } } // MARK: — Helpers - private static func associatedDirectories(for modelId: String) -> [URL] { - let candidates = [ - materializedDirectoryURL(for: modelId), - hubCacheDirectoryURL(for: modelId), - ] - - var seen = Set() - return candidates.filter { url in - guard directoryExists(url), !seen.contains(url.path) else { return false } - seen.insert(url.path) - return true - } - } - - private static func fileSizeResolvingSymlink(_ url: URL) -> Int64? { - let resolved = url.resolvingSymlinksInPath() - guard let attrs = try? FileManager.default.attributesOfItem(atPath: resolved.path) else { return nil } - if let size = attrs[.size] as? Int64 { return size } - if let size = attrs[.size] as? NSNumber { return size.int64Value } - return nil - } - - private static func int64Value(_ value: Any?) -> Int64? { - switch value { - case let value as Int64: return value - case let value as Int: return Int64(value) - case let value as NSNumber: return value.int64Value - case let value as String: return Int64(value) - default: return nil - } - } - private static func ensureDirectory(_ url: URL) { guard !FileManager.default.fileExists(atPath: url.path) else { return } try? FileManager.default.createDirectory(at: url, withIntermediateDirectories: true) diff --git a/Sources/SwiftLM/ModelProfiler.swift b/Sources/SwiftLM/ModelProfiler.swift index ea5f76a8..2cf40d85 100644 --- a/Sources/SwiftLM/ModelProfiler.swift +++ b/Sources/SwiftLM/ModelProfiler.swift @@ -12,6 +12,16 @@ import Foundation import MLX +import Darwin + +// C-aligned struct for vm.swapusage (Darwin) +struct xsw_usage { + var xsu_total: UInt64 + var xsu_used: UInt64 + var xsu_avail: UInt64 + var xsu_pagesize: UInt32 + var xsu_encrypted: Int32 +} // MARK: - Model Profile @@ -80,8 +90,10 @@ struct SystemProfile: Sendable { let totalRAMBytes: UInt64 let gpuArchitecture: String let recommendedWorkingSetBytes: Int + let swapUsedBytes: UInt64 var totalRAMGB: Double { Double(totalRAMBytes) / 1e9 } + var swapUsedGB: Double { Double(swapUsedBytes) / 1e9 } /// RAM available for the model after reserving space for macOS (~4GB) var availableRAMGB: Double { max(0, totalRAMGB - 4.0) } } @@ -204,6 +216,9 @@ enum ModelProfiler { let headDim: Int? let intermediateSize: Int? let vocabSize: Int? + let numExperts: Int? + let numExpertsAlt: Int? + let numExpertsPerTok: Int? enum CodingKeys: String, CodingKey { case numHiddenLayers = "num_hidden_layers" @@ -213,6 +228,9 @@ enum ModelProfiler { case headDim = "head_dim" case intermediateSize = "intermediate_size" case vocabSize = "vocab_size" + case numExperts = "num_local_experts" + case numExpertsAlt = "num_experts" + case numExpertsPerTok = "num_experts_per_tok" } } @@ -252,9 +270,9 @@ enum ModelProfiler { let quantBits = config.quantizationConfig?.bits ?? detectQuantBits(modelId: modelId) // Detect MoE - let isMoE = config.numExperts != nil && (config.numExperts ?? 0) > 1 - let numExperts = config.numExperts - let numActiveExperts = config.numExpertsPerTok + let numExperts = config.numExperts ?? config.textConfig?.numExperts ?? config.textConfig?.numExpertsAlt + let isMoE = (numExperts ?? 0) > 1 + let numActiveExperts = config.numExpertsPerTok ?? config.textConfig?.numExpertsPerTok // Measure weight file sizes on disk let weightSize = measureWeightFiles(directory: modelDirectory) @@ -333,10 +351,17 @@ enum ModelProfiler { let physicalBudget = Int(Double(totalRAM) * 0.85) - (4 * 1024 * 1024 * 1024) let recommended = max(physicalBudget, Int(deviceInfo.memorySize)) + // Get swap usage + var swap = xsw_usage(xsu_total: 0, xsu_used: 0, xsu_avail: 0, xsu_pagesize: 0, xsu_encrypted: 0) + var size = MemoryLayout.size + let result = sysctlbyname("vm.swapusage", &swap, &size, nil, 0) + let swapUsed = (result == 0) ? swap.xsu_used : 0 + return SystemProfile( totalRAMBytes: totalRAM, gpuArchitecture: deviceInfo.architecture, - recommendedWorkingSetBytes: recommended + recommendedWorkingSetBytes: recommended, + swapUsedBytes: swapUsed ) } @@ -349,24 +374,29 @@ enum ModelProfiler { : model.estimatedParamsB * (Double(model.quantBits) / 8.0) let draftGB = Double(draftWeightBytes) / 1e9 let kvGB = model.kvCacheMemoryGB(contextLength: contextSize) + + // MoE Hardening: Reserve a safety buffer for expert activation spikes. + // Even with SSD streaming, some buffers are resident. + let moeBufferGB = model.isMoE ? 2.0 : 0.0 + let overheadFactor = 1.2 - let totalGB = (weightGB + draftGB) * overheadFactor + kvGB + let totalRequiredGB = (weightGB + draftGB) * overheadFactor + kvGB + moeBufferGB let availableGB = system.availableRAMGB - let overcommit = totalGB / availableGB + let overcommit = totalRequiredGB / availableGB var warnings: [String] = [] // Determine strategy let strategy: PartitionStrategy - if totalGB <= availableGB * 0.85 { + if totalRequiredGB <= availableGB * 0.85 { strategy = .fullGPU - } else if totalGB <= availableGB { + } else if totalRequiredGB <= availableGB { strategy = .fullGPU - warnings.append("Model uses >\(Int(totalGB / availableGB * 100))% of available RAM. Performance may degrade under memory pressure.") - } else if totalGB <= availableGB * 2.0 { + warnings.append("Model uses >\(Int(totalRequiredGB / availableGB * 100))% of available RAM. Performance may degrade under memory pressure.") + } else if totalRequiredGB <= availableGB * 2.0 { strategy = .swapAssisted warnings.append("Model exceeds RAM by \(Int((overcommit - 1) * 100))%. macOS swap will be used. Expect 2-4× slowdown.") - } else if totalGB <= availableGB * 4.0 { + } else if totalRequiredGB <= availableGB * 4.0 { strategy = .layerPartitioned warnings.append("Model is \(String(format: "%.1f", overcommit))× system RAM. Layer partitioning needed for usable performance.") warnings.append("GPU/CPU layer split is not yet available in MLX Swift. Falling back to swap-assisted mode.") @@ -431,7 +461,7 @@ enum ModelProfiler { strategy: strategy, weightMemoryGB: weightGB, kvCacheMemoryGB: kvGB, - totalRequiredGB: totalGB, + totalRequiredGB: totalRequiredGB, systemRAMGB: system.totalRAMGB, availableRAMGB: availableGB, overcommitRatio: overcommit, diff --git a/Sources/SwiftLM/Server.swift b/Sources/SwiftLM/Server.swift index d1298ac2..2827a24f 100644 --- a/Sources/SwiftLM/Server.swift +++ b/Sources/SwiftLM/Server.swift @@ -452,6 +452,9 @@ struct MLXServer: AsyncParsableCommand { let profile = mainModelProfile ?? ModelProfiler.profile(modelDirectory: modelDir, modelId: modelId) if let profile = profile { let system = ModelProfiler.systemProfile() + if system.swapUsedGB > 1.0 { + print("[SwiftLM] ⚠️ High swap usage detected: \(String(format: "%.1f", system.swapUsedGB))GB used. Performance may be degraded.") + } let contextSize = self.ctxSize ?? 4096 let plan = ModelProfiler.plan(model: profile, system: system, contextSize: contextSize, draftWeightBytes: draftFootprintBytes) partitionPlan = plan @@ -471,7 +474,7 @@ struct MLXServer: AsyncParsableCommand { // SSD Streaming: expert weights are mmap'd from SSD via the OS page cache. // No swap involved — the page cache evicts stale expert pages cleanly. // draftFootprintBytes pre-computed once above (Copilot review). - let physicalBudget = computeSSDMemoryBudget(totalRAMBytes: system.totalRAMBytes, draftWeightBytes: draftFootprintBytes) + let physicalBudget = computeSSDMemoryBudget(totalRAMBytes: system.totalRAMBytes, isMoE: profile.isMoE, draftWeightBytes: draftFootprintBytes) Memory.cacheLimit = physicalBudget print("[SwiftLM] 💾 Memory strategy: SSD STREAMING (page-cache managed, \(physicalBudget / (1024*1024*1024))GB RAM budget, no swap)") } else { @@ -482,7 +485,7 @@ struct MLXServer: AsyncParsableCommand { case .layerPartitioned: if self.streamExperts { // draftFootprintBytes pre-computed once above (Copilot review). - let physicalBudget = computeSSDMemoryBudget(totalRAMBytes: system.totalRAMBytes, draftWeightBytes: draftFootprintBytes) + let physicalBudget = computeSSDMemoryBudget(totalRAMBytes: system.totalRAMBytes, isMoE: profile.isMoE, draftWeightBytes: draftFootprintBytes) Memory.cacheLimit = physicalBudget print("[SwiftLM] 💾 Memory strategy: SSD STREAMING (page-cache managed, \(physicalBudget / (1024*1024*1024))GB RAM budget, no swap)") } else { @@ -726,7 +729,7 @@ struct MLXServer: AsyncParsableCommand { print("[SwiftLM] 🚀 PAPPS 16-Worker Thread Pool prefetcher enabled!") } } else { - print("[SwiftLM] ⚠️ Model does not support SSD expert streaming") + print("[SwiftLM] ⚠️ Model does not support SSD expert streaming (Check architecture for QuantizedSwitchLinear layers)") } } @@ -1058,9 +1061,10 @@ struct ServerConfig: Sendable { /// Subtracted so the draft model's resident pages don't push the main model's /// page cache over the physical limit and trigger swap (Issue #72). /// - Returns: The recommended `Memory.cacheLimit` value in bytes. -func computeSSDMemoryBudget(totalRAMBytes: UInt64, draftWeightBytes: Int = 0) -> Int { +func computeSSDMemoryBudget(totalRAMBytes: UInt64, isMoE: Bool = false, draftWeightBytes: Int = 0) -> Int { let osHeadroom = 4 * 1024 * 1024 * 1024 // 4 GB for OS + system processes - let raw = Int(Double(totalRAMBytes) * 0.85) - osHeadroom - draftWeightBytes + let moeBuffer = isMoE ? 2 * 1024 * 1024 * 1024 : 0 // 2 GB safety buffer for dynamic expert activation + let raw = Int(Double(totalRAMBytes) * 0.85) - osHeadroom - draftWeightBytes - moeBuffer return max(raw, 2 * 1024 * 1024 * 1024) // floor at 2 GB } @@ -1180,24 +1184,7 @@ actor PromptCache { if cache.contains(where: { $0 is MambaCache }) { return } - let P = tokens.count - // For attention KVCacheSimple layers, the state tensor is [B, H, T, D] with a - // pre-allocated T that can exceed the actual prompt length P. If we store the - // full over-sized buffer, restore()'s trim() by (cached.tokens.count - matchLen) - // still leaves T - P slots of garbage beyond the valid prefix. Slice T to P at - // save time so cached.tokens.count === cached state's T. - let states: [[MLXArray]] = cache.map { layer -> [MLXArray] in - let s = layer.state - if layer is KVCacheSimple { - return s.map { arr -> MLXArray in - guard arr.ndim >= 3 else { return arr } - let T = arr.dim(2) - if T > P { return arr[.ellipsis, ..