From 720e4a8a756d387a56db0cb5c35df58131bab9a6 Mon Sep 17 00:00:00 2001 From: Ivan Morales Date: Tue, 2 Jun 2026 09:16:29 -0700 Subject: [PATCH] [SPARK-57183][SS] Close LRUCache on RocksDB.close() in unbounded memory mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? In unbounded memory mode (the default, `boundedMemoryUsage = false`), `RocksDBMemoryManager` creates a new `LRUCache` per instance: https://github.com/apache/spark/blob/d7df1920cfb8577c17a9b1555592d3772f061417/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala#L185 but `RocksDB.close()` never calls `lruCache.close()`: https://github.com/apache/spark/blob/d7df1920cfb8577c17a9b1555592d3772f061417/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala#L2125-L2135 The Java `LRUCache` wrapper holds a C++ `shared_ptr`, so the native object is only freed when the JVM GC finalizes the wrapper — which rarely happens under low heap pressure. This causes native memory to accumulate until GC eventually runs, leading to OOM kills in long-running processes or CI runs with many RocksDB-heavy test suites. The fix adds an explicit `lruCache.close()` call in `RocksDB.close()` for unbounded mode. In bounded mode the cache is a shared singleton managed by `RocksDBMemoryManager` and must not be closed per instance. This is a separate issue from SPARK-56523 (Statistics native memory leak), which was already fixed. ### Why are the changes needed? Without explicit `close()`, each `RocksDB` instance in unbounded mode leaks one `LRUCache` worth of native memory (`blockCacheSizeMB`, default 8 MB) for as long as GC does not run. The memory is never reclaimed deterministically. A standalone reproducer tool confirms ~8.5 MB of native memory growth per open/close cycle in leak mode vs flat memory in fixed mode: https://github.com/kete1987/rocksdb-leak-tool ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added a test in `RocksDBSuite` (`SPARK-57183: LRUCache is closed on RocksDB.close() in unbounded memory mode`) that verifies the native handle is released after `close()` via `LRUCache.isOwningHandle()`. ### Was this patch authored or co-authored using generative AI tooling? Co-authored with Claude (Anthropic), used for analysis, code generation and review assistance. Generated-by: Claude Sonnet 4.6 I affirm that the contribution is my original work and that I license the work to the project under the project's open source license. Closes #56234 from kete1987/SPARK-57183-rocksdb-lrucache-leak. Authored-by: Ivan Morales Signed-off-by: Anish Shrigondekar --- .../execution/streaming/state/RocksDB.scala | 9 ++++ .../streaming/state/RocksDBSuite.scala | 43 +++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 89e7a3058f3d0..bd479ffc6d2a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -2133,6 +2133,15 @@ class RocksDB( nativeStats.close() rocksDbOptions.close() dbLogger.close() + // In unbounded memory mode each RocksDB instance owns its LRUCache. Without explicit + // close() the native C++ cache object is only freed when the JVM GC finalizes the Java + // wrapper -- which rarely happens under low heap pressure. Closing explicitly here + // ensures native memory is reclaimed deterministically when the instance is released. + // In bounded mode the cache is a shared singleton managed by RocksDBMemoryManager + // and must not be closed here. + if (!conf.boundedMemoryUsage && lruCache != null) { + lruCache.close() + } var snapshot = snapshotsToUploadQueue.poll() while (snapshot != null) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 87560d1749562..dc697f5b99dc5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -3276,6 +3276,49 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } } + Seq(true, false).foreach { boundedMemoryUsage => + testWithColumnFamilies( + s"SPARK-57183: LRUCache is handled correctly on RocksDB.close() " + + s"with boundedMemoryUsage=$boundedMemoryUsage", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + withTempDir { dir => + try { + val sqlConf = new SQLConf + sqlConf.setConfString( + RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + "." + + RocksDBConf.BOUNDED_MEMORY_USAGE_CONF_KEY, boundedMemoryUsage.toString) + if (boundedMemoryUsage) { + sqlConf.setConfString( + RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + "." + + RocksDBConf.MAX_MEMORY_USAGE_MB_CONF_KEY, "128") + } + val dbConf = RocksDBConf(StateStoreConf(sqlConf)) + + val (_, cache) = withDB(dir.getCanonicalPath, conf = dbConf, + useColumnFamilies = colFamiliesEnabled) { db => + db.load(0) + db.put("k", "v") + db.commit() + db.getWriteBufferManagerAndCache() + } + if (boundedMemoryUsage) { + // Shared singleton -- must remain open after a single instance closes + assert(cache.isOwningHandle, + "Shared LRUCache handle must not be released after a single RocksDB.close() " + + "in bounded mode") + } else { + // Per-instance cache -- must be released deterministically on close() + assert(!cache.isOwningHandle, + "LRUCache native handle should be released after RocksDB.close() " + + "in unbounded mode") + } + } finally { + RocksDBMemoryManager.resetWriteBufferManagerAndCache + } + } + } + } + Seq("100", "1000", "100000").foreach { totalMemorySizeMB => testWithColumnFamilies(s"Memory mgmt - valid config " + s"with totalMemorySizeMB=$totalMemorySizeMB",