From 93b976053ba7e8fe452b96deeeb89be5b76163da Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Fri, 22 May 2026 09:37:20 +0800 Subject: [PATCH 1/2] HDDS-15346. DiskBalancer should update delta sizes atomically. --- .../diskbalancer/DiskBalancerService.java | 30 +++- .../diskbalancer/TestDiskBalancerTask.java | 130 ++++++++++++++++++ 2 files changed, 156 insertions(+), 4 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java index 70d3e8598d4..cee5ac721dd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java @@ -443,8 +443,7 @@ public BackgroundTaskQueue getTasks() { destVolume); queue.add(task); inProgressContainers.add(ContainerID.valueOf(toBalanceContainer.getContainerID())); - deltaSizes.put(sourceVolume, deltaSizes.getOrDefault(sourceVolume, 0L) - - toBalanceContainer.getBytesUsed()); + reserveDeltaSize(sourceVolume, toBalanceContainer.getBytesUsed()); } } } @@ -492,6 +491,30 @@ private boolean shouldDelay() { return false; } + private void reserveDeltaSize(HddsVolume sourceVolume, long bytes) { + // deltaSizes can be updated by multiple DiskBalancer tasks for the same + // source volume. Use compute to avoid lost updates from a non-atomic + // get/put sequence. + deltaSizes.compute(sourceVolume, (volume, current) -> { + long updated = (current == null ? 0L : current) - bytes; + return updated == 0L ? null : updated; + }); + } + + private void releaseDeltaSize(HddsVolume sourceVolume, long bytes) { + // Match reserveDeltaSize and release the previously reserved bytes + // atomically when a DiskBalancer task completes. + deltaSizes.compute(sourceVolume, (volume, current) -> { + if (current == null) { + LOG.warn("No reserved delta size found for source volume {} when " + + "releasing {} bytes.", sourceVolume, bytes); + return null; + } + long updated = current + bytes; + return updated == 0L ? null : updated; + }); + } + protected class DiskBalancerTask implements BackgroundTask { private HddsVolume sourceVolume; @@ -654,8 +677,7 @@ public int getPriority() { private void postCall(boolean success, long startTime) { inProgressContainers.remove(ContainerID.valueOf(containerData.getContainerID())); - deltaSizes.put(sourceVolume, deltaSizes.get(sourceVolume) + - containerData.getBytesUsed()); + releaseDeltaSize(sourceVolume, containerData.getBytesUsed()); destVolume.incCommittedBytes(0 - containerData.getBytesUsed()); long endTime = Time.monotonicNow(); if (success) { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java index fc3fdb7b140..61513570408 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java @@ -36,19 +36,24 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.AbstractMap; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.BiFunction; import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -87,6 +92,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; @@ -314,6 +320,37 @@ public void moveSuccess(State containerState) throws IOException { assertEquals(CONTAINER_ID, containerIterator.next()); } + @Test + public void concurrentPostCallRestoresDeltaSizeAtomically() + throws Exception { + Method postCall = DiskBalancerService.DiskBalancerTask.class + .getDeclaredMethod("postCall", boolean.class, long.class); + postCall.setAccessible(true); + + RacingDeltaMap deltaSizes = new RacingDeltaMap(sourceVolume); + setDeltaSizes(deltaSizes); + deltaSizes.put(sourceVolume, -2 * CONTAINER_SIZE); + destVolume.incCommittedBytes(2 * CONTAINER_SIZE); + + DiskBalancerService.DiskBalancerTask task1 = + newTask(CONTAINER_ID + 1); + DiskBalancerService.DiskBalancerTask task2 = + newTask(CONTAINER_ID + 2); + + deltaSizes.enableRacingGets(); + CompletableFuture first = CompletableFuture.runAsync( + () -> invokePostCall(postCall, task1)); + CompletableFuture second = CompletableFuture.runAsync( + () -> invokePostCall(postCall, task2)); + + first.get(5, TimeUnit.SECONDS); + second.get(5, TimeUnit.SECONDS); + + deltaSizes.disableRacingGets(); + assertEquals(0L, deltaSizes.getOrDefault(sourceVolume, 0L)); + assertEquals(0L, destVolume.getCommittedBytes()); + } + @ContainerTestVersionInfo.ContainerTest public void moveFailsAfterCopy(ContainerTestVersionInfo versionInfo) throws IOException, InterruptedException, TimeoutException, ExecutionException { @@ -679,6 +716,99 @@ public void testMoveSkippedWhenContainerStateChanged(State invalidState) assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume)); } + private DiskBalancerService.DiskBalancerTask newTask(long containerId) { + KeyValueContainerData containerData = new KeyValueContainerData( + containerId, ContainerLayoutVersion.FILE_PER_BLOCK, CONTAINER_SIZE, + UUID.randomUUID().toString(), datanodeUuid); + containerData.getStatistics().setBlockBytesForTesting(CONTAINER_SIZE); + return diskBalancerService.new DiskBalancerTask(containerData, + sourceVolume, destVolume); + } + + private void invokePostCall(Method postCall, + DiskBalancerService.DiskBalancerTask task) { + try { + postCall.invoke(task, false, TimeUnit.MILLISECONDS.toMillis(1)); + } catch (ReflectiveOperationException e) { + throw new AssertionError(e); + } + } + + private void setDeltaSizes(Map deltaSizes) + throws ReflectiveOperationException { + Field field = DiskBalancerService.class.getDeclaredField("deltaSizes"); + field.setAccessible(true); + field.set(diskBalancerService, deltaSizes); + } + + private static final class RacingDeltaMap + extends AbstractMap { + private final Map entries = new HashMap<>(); + private final HddsVolume racedVolume; + private final CountDownLatch concurrentGets = new CountDownLatch(2); + private volatile boolean raceGets; + + private RacingDeltaMap(HddsVolume racedVolume) { + this.racedVolume = racedVolume; + } + + private void enableRacingGets() { + raceGets = true; + } + + private void disableRacingGets() { + raceGets = false; + } + + @Override + public Long get(Object key) { + Long value; + synchronized (this) { + value = entries.get(key); + } + if (raceGets && key == racedVolume) { + concurrentGets.countDown(); + try { + if (!concurrentGets.await(5, TimeUnit.SECONDS)) { + throw new AssertionError("Timed out waiting for concurrent gets"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } + } + return value; + } + + @Override + public synchronized Long put(HddsVolume key, Long value) { + return entries.put(key, value); + } + + @Override + public synchronized Long remove(Object key) { + return entries.remove(key); + } + + @Override + public synchronized Long compute(HddsVolume key, + BiFunction + remappingFunction) { + Long newValue = remappingFunction.apply(key, entries.get(key)); + if (newValue == null) { + entries.remove(key); + } else { + entries.put(key, newValue); + } + return newValue; + } + + @Override + public synchronized Set> entrySet() { + return entries.entrySet(); + } + } + private KeyValueContainer createContainer(long containerId, HddsVolume vol, State state) throws IOException { KeyValueContainerData containerData = new KeyValueContainerData( From bfaa91b52d9a664d57cadcc693f9882e6e762a3e Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Fri, 22 May 2026 12:02:17 +0800 Subject: [PATCH 2/2] HDDS-15346. DiskBalancer should update delta sizes atomically. --- .../diskbalancer/TestDiskBalancerTask.java | 39 +++++++++---------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java index 61513570408..a3ac3552f1b 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java @@ -280,8 +280,7 @@ public void moveSuccess(State containerState) throws IOException { long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace(); long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace(); long initialDestCommitted = destVolume.getCommittedBytes(); - long initialSourceDelta = diskBalancerService.getDeltaSizes().get(sourceVolume) == null ? - 0L : diskBalancerService.getDeltaSizes().get(sourceVolume); + long initialSourceDelta = getDeltaSize(sourceVolume); Container container = createContainer(CONTAINER_ID, sourceVolume, containerState); State originalState = container.getContainerState(); @@ -311,7 +310,7 @@ public void moveSuccess(State containerState) throws IOException { assertEquals(1, diskBalancerService.getMetrics().getSuccessCount()); assertEquals(CONTAINER_SIZE, diskBalancerService.getMetrics().getSuccessBytes()); assertEquals(initialDestCommitted, destVolume.getCommittedBytes()); - assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume)); + assertEquals(initialSourceDelta, getDeltaSize(sourceVolume)); containerIterator = sourceVolume.getContainerIterator(); assertFalse(containerIterator.hasNext()); @@ -360,8 +359,7 @@ public void moveFailsAfterCopy(ContainerTestVersionInfo versionInfo) long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace(); long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace(); long initialDestCommitted = destVolume.getCommittedBytes(); - long initialSourceDelta = diskBalancerService.getDeltaSizes().get(sourceVolume) == null ? - 0L : diskBalancerService.getDeltaSizes().get(sourceVolume); + long initialSourceDelta = getDeltaSize(sourceVolume); String oldContainerPath = container.getContainerData().getContainerPath(); // verify temp container directory doesn't exist before task execution @@ -400,7 +398,7 @@ public void moveFailsAfterCopy(ContainerTestVersionInfo versionInfo) assertEquals(1, diskBalancerService.getMetrics().getFailureCount()); assertEquals(initialDestCommitted, destVolume.getCommittedBytes()); assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID))); - assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume)); + assertEquals(initialSourceDelta, getDeltaSize(sourceVolume)); } @ContainerTestVersionInfo.ContainerTest @@ -412,8 +410,7 @@ public void moveFailsOnAtomicMove(ContainerTestVersionInfo versionInfo) long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace(); long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace(); long initialDestCommitted = destVolume.getCommittedBytes(); - long initialSourceDelta = diskBalancerService.getDeltaSizes().get(sourceVolume) == null ? - 0L : diskBalancerService.getDeltaSizes().get(sourceVolume); + long initialSourceDelta = getDeltaSize(sourceVolume); String oldContainerPath = container.getContainerData().getContainerPath(); Path tempDir = destVolume.getTmpDir().toPath() .resolve(DISK_BALANCER_DIR) @@ -464,7 +461,7 @@ public void moveFailsOnAtomicMove(ContainerTestVersionInfo versionInfo) assertEquals(1, diskBalancerService.getMetrics().getFailureCount()); assertEquals(initialDestCommitted, destVolume.getCommittedBytes()); assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID))); - assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume)); + assertEquals(initialSourceDelta, getDeltaSize(sourceVolume)); } @ContainerTestVersionInfo.ContainerTest @@ -476,8 +473,7 @@ public void moveFailsDuringInMemoryUpdate(ContainerTestVersionInfo versionInfo) long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace(); long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace(); long initialDestCommitted = destVolume.getCommittedBytes(); - long initialSourceDelta = diskBalancerService.getDeltaSizes().get(sourceVolume) == null ? - 0L : diskBalancerService.getDeltaSizes().get(sourceVolume); + long initialSourceDelta = getDeltaSize(sourceVolume); String oldContainerPath = container.getContainerData().getContainerPath(); Path destDirPath = Paths.get( KeyValueContainerLocationUtil.getBaseContainerLocation( @@ -529,7 +525,7 @@ public void moveFailsDuringInMemoryUpdate(ContainerTestVersionInfo versionInfo) assertEquals(1, diskBalancerService.getMetrics().getFailureCount()); assertEquals(initialDestCommitted, destVolume.getCommittedBytes()); assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID))); - assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume)); + assertEquals(initialSourceDelta, getDeltaSize(sourceVolume)); } @ContainerTestVersionInfo.ContainerTest @@ -540,8 +536,7 @@ public void moveFailsDuringOldContainerRemove(ContainerTestVersionInfo versionIn long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace(); long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace(); long initialDestCommitted = destVolume.getCommittedBytes(); - long initialSourceDelta = diskBalancerService.getDeltaSizes().get(sourceVolume) == null ? - 0L : diskBalancerService.getDeltaSizes().get(sourceVolume); + long initialSourceDelta = getDeltaSize(sourceVolume); // Use a static mock for the KeyValueContainer utility class try (MockedStatic mockedUtil = @@ -578,7 +573,7 @@ public void moveFailsDuringOldContainerRemove(ContainerTestVersionInfo versionIn destVolume.getCurrentUsage().getUsedSpace()); assertEquals(initialDestCommitted, destVolume.getCommittedBytes()); assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID))); - assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume)); + assertEquals(initialSourceDelta, getDeltaSize(sourceVolume)); } } @@ -590,8 +585,7 @@ public void testDestVolumeCommittedSpaceReleased(ContainerTestVersionInfo versio long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace(); long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace(); long initialDestCommitted = destVolume.getCommittedBytes(); - long initialSourceDelta = diskBalancerService.getDeltaSizes().get(sourceVolume) == null ? - 0L : diskBalancerService.getDeltaSizes().get(sourceVolume); + long initialSourceDelta = getDeltaSize(sourceVolume); GenericTestUtils.LogCapturer serviceLog = GenericTestUtils.LogCapturer.captureLogs(DiskBalancerService.class); DiskBalancerService.DiskBalancerTask task = getTask(); @@ -612,7 +606,7 @@ public void testDestVolumeCommittedSpaceReleased(ContainerTestVersionInfo versio assertEquals(1, diskBalancerService.getMetrics().getFailureCount()); assertEquals(initialDestCommitted, destVolume.getCommittedBytes()); assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID))); - assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume)); + assertEquals(initialSourceDelta, getDeltaSize(sourceVolume)); } @ContainerTestVersionInfo.ContainerTest @@ -659,8 +653,7 @@ public void testMoveSkippedWhenContainerStateChanged(State invalidState) long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace(); long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace(); long initialDestCommitted = destVolume.getCommittedBytes(); - long initialSourceDelta = diskBalancerService.getDeltaSizes().get(sourceVolume) == null ? - 0L : diskBalancerService.getDeltaSizes().get(sourceVolume); + long initialSourceDelta = getDeltaSize(sourceVolume); String oldContainerPath = container.getContainerData().getContainerPath(); // Verify temp container directory doesn't exist before task execution @@ -713,7 +706,7 @@ public void testMoveSkippedWhenContainerStateChanged(State invalidState) assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID))); // Verify delta sizes are restored - assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume)); + assertEquals(initialSourceDelta, getDeltaSize(sourceVolume)); } private DiskBalancerService.DiskBalancerTask newTask(long containerId) { @@ -741,6 +734,10 @@ private void setDeltaSizes(Map deltaSizes) field.set(diskBalancerService, deltaSizes); } + private long getDeltaSize(HddsVolume volume) { + return diskBalancerService.getDeltaSizes().getOrDefault(volume, 0L); + } + private static final class RacingDeltaMap extends AbstractMap { private final Map entries = new HashMap<>();