Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.util.concurrent.Striped;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -57,10 +58,14 @@ public boolean getQuota(int permits) {
CompletableFuture.runAsync(() -> {
int p = 0;
try {
for (String key : mappedBuffers.keySet()) {
ByteBuffer buf = mappedBuffers.get(key).get();
if (buf == null) {
mappedBuffers.remove(key);
// remove(key, value) only counts entries we observed cleared,
// so a concurrent put() that replaced the WeakReference is not
// miscounted as freed.
for (Map.Entry<String, WeakReference<ByteBuffer>> entry
: mappedBuffers.entrySet()) {
final WeakReference<ByteBuffer> ref = entry.getValue();
if (ref.get() == null
&& mappedBuffers.remove(entry.getKey(), ref)) {
p++;
}
}
Expand Down Expand Up @@ -93,14 +98,16 @@ public ByteBuffer computeIfAbsent(String file, long position, long size,
Lock fileLock = lock.get(key);
fileLock.lock();
try {
WeakReference<ByteBuffer> refer = mappedBuffers.get(key);
if (refer != null && refer.get() != null) {
// reuse the mapped buffer
// Hold a strong reference for the rest of this method so GC cannot
// clear the WeakReference between the null check and the return.
final WeakReference<ByteBuffer> refer = mappedBuffers.get(key);
final ByteBuffer cached = refer != null ? refer.get() : null;
if (cached != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("find buffer for key {}", key);
}
releaseQuota(1);
return refer.get();
return cached;
}

ByteBuffer buffer = supplier.get();
Expand Down