diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 89e7a3058f3d0..bd479ffc6d2a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -2133,6 +2133,15 @@ class RocksDB( nativeStats.close() rocksDbOptions.close() dbLogger.close() + // In unbounded memory mode each RocksDB instance owns its LRUCache. Without explicit + // close() the native C++ cache object is only freed when the JVM GC finalizes the Java + // wrapper -- which rarely happens under low heap pressure. Closing explicitly here + // ensures native memory is reclaimed deterministically when the instance is released. + // In bounded mode the cache is a shared singleton managed by RocksDBMemoryManager + // and must not be closed here. + if (!conf.boundedMemoryUsage && lruCache != null) { + lruCache.close() + } var snapshot = snapshotsToUploadQueue.poll() while (snapshot != null) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 87560d1749562..dc697f5b99dc5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -3276,6 +3276,49 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } } + Seq(true, false).foreach { boundedMemoryUsage => + testWithColumnFamilies( + s"SPARK-57183: LRUCache is handled correctly on RocksDB.close() " + + s"with boundedMemoryUsage=$boundedMemoryUsage", + TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled => + withTempDir { dir => + try { + val sqlConf = new SQLConf + sqlConf.setConfString( + RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + "." + + RocksDBConf.BOUNDED_MEMORY_USAGE_CONF_KEY, boundedMemoryUsage.toString) + if (boundedMemoryUsage) { + sqlConf.setConfString( + RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + "." + + RocksDBConf.MAX_MEMORY_USAGE_MB_CONF_KEY, "128") + } + val dbConf = RocksDBConf(StateStoreConf(sqlConf)) + + val (_, cache) = withDB(dir.getCanonicalPath, conf = dbConf, + useColumnFamilies = colFamiliesEnabled) { db => + db.load(0) + db.put("k", "v") + db.commit() + db.getWriteBufferManagerAndCache() + } + if (boundedMemoryUsage) { + // Shared singleton -- must remain open after a single instance closes + assert(cache.isOwningHandle, + "Shared LRUCache handle must not be released after a single RocksDB.close() " + + "in bounded mode") + } else { + // Per-instance cache -- must be released deterministically on close() + assert(!cache.isOwningHandle, + "LRUCache native handle should be released after RocksDB.close() " + + "in unbounded mode") + } + } finally { + RocksDBMemoryManager.resetWriteBufferManagerAndCache + } + } + } + } + Seq("100", "1000", "100000").foreach { totalMemorySizeMB => testWithColumnFamilies(s"Memory mgmt - valid config " + s"with totalMemorySizeMB=$totalMemorySizeMB",