Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,7 @@ public BackgroundTaskQueue getTasks() {
destVolume);
queue.add(task);
inProgressContainers.add(ContainerID.valueOf(toBalanceContainer.getContainerID()));
deltaSizes.put(sourceVolume, deltaSizes.getOrDefault(sourceVolume, 0L)
- toBalanceContainer.getBytesUsed());
reserveDeltaSize(sourceVolume, toBalanceContainer.getBytesUsed());
}
}
}
Expand Down Expand Up @@ -492,6 +491,30 @@ private boolean shouldDelay() {
return false;
}

private void reserveDeltaSize(HddsVolume sourceVolume, long bytes) {
// deltaSizes can be updated by multiple DiskBalancer tasks for the same
// source volume. Use compute to avoid lost updates from a non-atomic
// get/put sequence.
deltaSizes.compute(sourceVolume, (volume, current) -> {
long updated = (current == null ? 0L : current) - bytes;
return updated == 0L ? null : updated;
});
}

private void releaseDeltaSize(HddsVolume sourceVolume, long bytes) {
// Match reserveDeltaSize and release the previously reserved bytes
// atomically when a DiskBalancer task completes.
deltaSizes.compute(sourceVolume, (volume, current) -> {
if (current == null) {
LOG.warn("No reserved delta size found for source volume {} when "
+ "releasing {} bytes.", sourceVolume, bytes);
return null;
}
long updated = current + bytes;
return updated == 0L ? null : updated;
});
}

protected class DiskBalancerTask implements BackgroundTask {

private HddsVolume sourceVolume;
Expand Down Expand Up @@ -654,8 +677,7 @@ public int getPriority() {

private void postCall(boolean success, long startTime) {
inProgressContainers.remove(ContainerID.valueOf(containerData.getContainerID()));
deltaSizes.put(sourceVolume, deltaSizes.get(sourceVolume) +
containerData.getBytesUsed());
releaseDeltaSize(sourceVolume, containerData.getBytesUsed());
destVolume.incCommittedBytes(0 - containerData.getBytesUsed());
long endTime = Time.monotonicNow();
if (success) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,24 @@

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
Expand Down Expand Up @@ -87,6 +92,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -274,8 +280,7 @@ public void moveSuccess(State containerState) throws IOException {
long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
long initialDestCommitted = destVolume.getCommittedBytes();
long initialSourceDelta = diskBalancerService.getDeltaSizes().get(sourceVolume) == null ?
0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
long initialSourceDelta = getDeltaSize(sourceVolume);

Container container = createContainer(CONTAINER_ID, sourceVolume, containerState);
State originalState = container.getContainerState();
Expand Down Expand Up @@ -305,7 +310,7 @@ public void moveSuccess(State containerState) throws IOException {
assertEquals(1, diskBalancerService.getMetrics().getSuccessCount());
assertEquals(CONTAINER_SIZE, diskBalancerService.getMetrics().getSuccessBytes());
assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume));
assertEquals(initialSourceDelta, getDeltaSize(sourceVolume));

containerIterator = sourceVolume.getContainerIterator();
assertFalse(containerIterator.hasNext());
Expand All @@ -314,6 +319,37 @@ public void moveSuccess(State containerState) throws IOException {
assertEquals(CONTAINER_ID, containerIterator.next());
}

@Test
public void concurrentPostCallRestoresDeltaSizeAtomically()
throws Exception {
Method postCall = DiskBalancerService.DiskBalancerTask.class
.getDeclaredMethod("postCall", boolean.class, long.class);
postCall.setAccessible(true);

RacingDeltaMap deltaSizes = new RacingDeltaMap(sourceVolume);
setDeltaSizes(deltaSizes);
deltaSizes.put(sourceVolume, -2 * CONTAINER_SIZE);
destVolume.incCommittedBytes(2 * CONTAINER_SIZE);

DiskBalancerService.DiskBalancerTask task1 =
newTask(CONTAINER_ID + 1);
DiskBalancerService.DiskBalancerTask task2 =
newTask(CONTAINER_ID + 2);

deltaSizes.enableRacingGets();
CompletableFuture<Void> first = CompletableFuture.runAsync(
() -> invokePostCall(postCall, task1));
CompletableFuture<Void> second = CompletableFuture.runAsync(
() -> invokePostCall(postCall, task2));

first.get(5, TimeUnit.SECONDS);
second.get(5, TimeUnit.SECONDS);

deltaSizes.disableRacingGets();
assertEquals(0L, deltaSizes.getOrDefault(sourceVolume, 0L));
assertEquals(0L, destVolume.getCommittedBytes());
}

@ContainerTestVersionInfo.ContainerTest
public void moveFailsAfterCopy(ContainerTestVersionInfo versionInfo)
throws IOException, InterruptedException, TimeoutException, ExecutionException {
Expand All @@ -323,8 +359,7 @@ public void moveFailsAfterCopy(ContainerTestVersionInfo versionInfo)
long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
long initialDestCommitted = destVolume.getCommittedBytes();
long initialSourceDelta = diskBalancerService.getDeltaSizes().get(sourceVolume) == null ?
0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
long initialSourceDelta = getDeltaSize(sourceVolume);
String oldContainerPath = container.getContainerData().getContainerPath();

// verify temp container directory doesn't exist before task execution
Expand Down Expand Up @@ -363,7 +398,7 @@ public void moveFailsAfterCopy(ContainerTestVersionInfo versionInfo)
assertEquals(1, diskBalancerService.getMetrics().getFailureCount());
assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID)));
assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume));
assertEquals(initialSourceDelta, getDeltaSize(sourceVolume));
}

@ContainerTestVersionInfo.ContainerTest
Expand All @@ -375,8 +410,7 @@ public void moveFailsOnAtomicMove(ContainerTestVersionInfo versionInfo)
long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
long initialDestCommitted = destVolume.getCommittedBytes();
long initialSourceDelta = diskBalancerService.getDeltaSizes().get(sourceVolume) == null ?
0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
long initialSourceDelta = getDeltaSize(sourceVolume);
String oldContainerPath = container.getContainerData().getContainerPath();
Path tempDir = destVolume.getTmpDir().toPath()
.resolve(DISK_BALANCER_DIR)
Expand Down Expand Up @@ -427,7 +461,7 @@ public void moveFailsOnAtomicMove(ContainerTestVersionInfo versionInfo)
assertEquals(1, diskBalancerService.getMetrics().getFailureCount());
assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID)));
assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume));
assertEquals(initialSourceDelta, getDeltaSize(sourceVolume));
}

@ContainerTestVersionInfo.ContainerTest
Expand All @@ -439,8 +473,7 @@ public void moveFailsDuringInMemoryUpdate(ContainerTestVersionInfo versionInfo)
long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
long initialDestCommitted = destVolume.getCommittedBytes();
long initialSourceDelta = diskBalancerService.getDeltaSizes().get(sourceVolume) == null ?
0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
long initialSourceDelta = getDeltaSize(sourceVolume);
String oldContainerPath = container.getContainerData().getContainerPath();
Path destDirPath = Paths.get(
KeyValueContainerLocationUtil.getBaseContainerLocation(
Expand Down Expand Up @@ -492,7 +525,7 @@ public void moveFailsDuringInMemoryUpdate(ContainerTestVersionInfo versionInfo)
assertEquals(1, diskBalancerService.getMetrics().getFailureCount());
assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID)));
assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume));
assertEquals(initialSourceDelta, getDeltaSize(sourceVolume));
}

@ContainerTestVersionInfo.ContainerTest
Expand All @@ -503,8 +536,7 @@ public void moveFailsDuringOldContainerRemove(ContainerTestVersionInfo versionIn
long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
long initialDestCommitted = destVolume.getCommittedBytes();
long initialSourceDelta = diskBalancerService.getDeltaSizes().get(sourceVolume) == null ?
0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
long initialSourceDelta = getDeltaSize(sourceVolume);

// Use a static mock for the KeyValueContainer utility class
try (MockedStatic<KeyValueContainerUtil> mockedUtil =
Expand Down Expand Up @@ -541,7 +573,7 @@ public void moveFailsDuringOldContainerRemove(ContainerTestVersionInfo versionIn
destVolume.getCurrentUsage().getUsedSpace());
assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID)));
assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume));
assertEquals(initialSourceDelta, getDeltaSize(sourceVolume));
}
}

Expand All @@ -553,8 +585,7 @@ public void testDestVolumeCommittedSpaceReleased(ContainerTestVersionInfo versio
long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
long initialDestCommitted = destVolume.getCommittedBytes();
long initialSourceDelta = diskBalancerService.getDeltaSizes().get(sourceVolume) == null ?
0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
long initialSourceDelta = getDeltaSize(sourceVolume);

GenericTestUtils.LogCapturer serviceLog = GenericTestUtils.LogCapturer.captureLogs(DiskBalancerService.class);
DiskBalancerService.DiskBalancerTask task = getTask();
Expand All @@ -575,7 +606,7 @@ public void testDestVolumeCommittedSpaceReleased(ContainerTestVersionInfo versio
assertEquals(1, diskBalancerService.getMetrics().getFailureCount());
assertEquals(initialDestCommitted, destVolume.getCommittedBytes());
assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID)));
assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume));
assertEquals(initialSourceDelta, getDeltaSize(sourceVolume));
}

@ContainerTestVersionInfo.ContainerTest
Expand Down Expand Up @@ -622,8 +653,7 @@ public void testMoveSkippedWhenContainerStateChanged(State invalidState)
long initialSourceUsed = sourceVolume.getCurrentUsage().getUsedSpace();
long initialDestUsed = destVolume.getCurrentUsage().getUsedSpace();
long initialDestCommitted = destVolume.getCommittedBytes();
long initialSourceDelta = diskBalancerService.getDeltaSizes().get(sourceVolume) == null ?
0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
long initialSourceDelta = getDeltaSize(sourceVolume);
String oldContainerPath = container.getContainerData().getContainerPath();

// Verify temp container directory doesn't exist before task execution
Expand Down Expand Up @@ -676,7 +706,104 @@ public void testMoveSkippedWhenContainerStateChanged(State invalidState)
assertFalse(diskBalancerService.getInProgressContainers().contains(ContainerID.valueOf(CONTAINER_ID)));

// Verify delta sizes are restored
assertEquals(initialSourceDelta, diskBalancerService.getDeltaSizes().get(sourceVolume));
assertEquals(initialSourceDelta, getDeltaSize(sourceVolume));
}

private DiskBalancerService.DiskBalancerTask newTask(long containerId) {
KeyValueContainerData containerData = new KeyValueContainerData(
containerId, ContainerLayoutVersion.FILE_PER_BLOCK, CONTAINER_SIZE,
UUID.randomUUID().toString(), datanodeUuid);
containerData.getStatistics().setBlockBytesForTesting(CONTAINER_SIZE);
return diskBalancerService.new DiskBalancerTask(containerData,
sourceVolume, destVolume);
}

private void invokePostCall(Method postCall,
DiskBalancerService.DiskBalancerTask task) {
try {
postCall.invoke(task, false, TimeUnit.MILLISECONDS.toMillis(1));
} catch (ReflectiveOperationException e) {
throw new AssertionError(e);
}
}

private void setDeltaSizes(Map<HddsVolume, Long> deltaSizes)
throws ReflectiveOperationException {
Field field = DiskBalancerService.class.getDeclaredField("deltaSizes");
field.setAccessible(true);
field.set(diskBalancerService, deltaSizes);
}

private long getDeltaSize(HddsVolume volume) {
return diskBalancerService.getDeltaSizes().getOrDefault(volume, 0L);
}

private static final class RacingDeltaMap
extends AbstractMap<HddsVolume, Long> {
private final Map<HddsVolume, Long> entries = new HashMap<>();
private final HddsVolume racedVolume;
private final CountDownLatch concurrentGets = new CountDownLatch(2);
private volatile boolean raceGets;

private RacingDeltaMap(HddsVolume racedVolume) {
this.racedVolume = racedVolume;
}

private void enableRacingGets() {
raceGets = true;
}

private void disableRacingGets() {
raceGets = false;
}

@Override
public Long get(Object key) {
Long value;
synchronized (this) {
value = entries.get(key);
}
if (raceGets && key == racedVolume) {
concurrentGets.countDown();
try {
if (!concurrentGets.await(5, TimeUnit.SECONDS)) {
throw new AssertionError("Timed out waiting for concurrent gets");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(e);
}
}
return value;
}

@Override
public synchronized Long put(HddsVolume key, Long value) {
return entries.put(key, value);
}

@Override
public synchronized Long remove(Object key) {
return entries.remove(key);
}

@Override
public synchronized Long compute(HddsVolume key,
BiFunction<? super HddsVolume, ? super Long, ? extends Long>
remappingFunction) {
Long newValue = remappingFunction.apply(key, entries.get(key));
if (newValue == null) {
entries.remove(key);
} else {
entries.put(key, newValue);
}
return newValue;
}

@Override
public synchronized Set<Entry<HddsVolume, Long>> entrySet() {
return entries.entrySet();
}
}

private KeyValueContainer createContainer(long containerId, HddsVolume vol, State state)
Expand Down