Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
private LinkedBlockingDeque<CallRunner> queue;

// so we can calculate actual threshold to switch to LIFO under load
private int maxCapacity;
private volatile int softLimit;

// metrics (shared across all queues)
private LongAdder numGeneralCallsDropped;
Expand All @@ -71,27 +71,30 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
private AtomicBoolean isOverloaded = new AtomicBoolean(false);

public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval,
double lifoThreshold, LongAdder numGeneralCallsDropped, LongAdder numLifoModeSwitches) {
this.maxCapacity = capacity;
double lifoThreshold, LongAdder numGeneralCallsDropped, LongAdder numLifoModeSwitches,
int currentQueueLimit) {
this.queue = new LinkedBlockingDeque<>(capacity);
this.codelTargetDelay = targetDelay;
this.codelInterval = interval;
this.lifoThreshold = lifoThreshold;
this.numGeneralCallsDropped = numGeneralCallsDropped;
this.numLifoModeSwitches = numLifoModeSwitches;
this.softLimit = currentQueueLimit;
}

/**
* Update tunables.
* @param newCodelTargetDelay new CoDel target delay
* @param newCodelInterval new CoDel interval
* @param newLifoThreshold new Adaptive Lifo threshold
* @param currentQueueLimit new soft limit of queue
*/
public void updateTunables(int newCodelTargetDelay, int newCodelInterval,
double newLifoThreshold) {
public void updateTunables(int newCodelTargetDelay, int newCodelInterval, double newLifoThreshold,
int currentQueueLimit) {
this.codelTargetDelay = newCodelTargetDelay;
this.codelInterval = newCodelInterval;
this.lifoThreshold = newLifoThreshold;
this.softLimit = currentQueueLimit;
}

/**
Expand All @@ -104,7 +107,7 @@ public void updateTunables(int newCodelTargetDelay, int newCodelInterval,
public CallRunner take() throws InterruptedException {
CallRunner cr;
while (true) {
if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
if (((double) queue.size() / this.softLimit) > lifoThreshold) {
numLifoModeSwitches.increment();
cr = queue.takeLast();
} else {
Expand All @@ -124,7 +127,7 @@ public CallRunner poll() {
CallRunner cr;
boolean switched = false;
while (true) {
if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
if (((double) queue.size() / this.softLimit) > lifoThreshold) {
// Only count once per switch.
if (!switched) {
switched = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ public abstract class RpcExecutor {
private final Class<? extends BlockingQueue> queueClass;
private final Object[] queueInitArgs;

// this is soft limit of the queue, not size/capacity.
protected volatile int currentQueueLimit;
// While initializing we will use hard limit as the capacity of queue, it will let us dynamically
// change the queue limit
protected final int queueHardLimit;

private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
private final List<RpcHandler> handlers;
Expand Down Expand Up @@ -161,11 +165,13 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ
int handlerCountPerQueue = this.handlerCount / this.numCallQueues;
maxQueueLength = handlerCountPerQueue * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER;
}
currentQueueLimit = maxQueueLength;
queueHardLimit = Math.max(maxQueueLength, DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);

if (isDeadlineQueueType(callQueueType)) {
this.name += ".Deadline";
this.queueInitArgs =
new Object[] { maxQueueLength, new CallPriorityComparator(conf, priority) };
new Object[] { queueHardLimit, new CallPriorityComparator(conf, priority) };
this.queueClass = BoundedPriorityBlockingQueue.class;
} else if (isCodelQueueType(callQueueType)) {
this.name += ".Codel";
Expand All @@ -174,8 +180,8 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ
int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
double codelLifoThreshold =
conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
this.queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval,
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
this.queueInitArgs = new Object[] { queueHardLimit, codelTargetDelay, codelInterval,
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches, currentQueueLimit };
this.queueClass = AdaptiveLifoCoDelCallQueue.class;
} else if (isPluggableQueueType(callQueueType)) {
Optional<Class<? extends BlockingQueue<CallRunner>>> pluggableQueueClass =
Expand All @@ -185,12 +191,12 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ
throw new PluggableRpcQueueNotFound(
"Pluggable call queue failed to load and selected call" + " queue type required");
} else {
this.queueInitArgs = new Object[] { maxQueueLength, priority, conf };
this.queueInitArgs = new Object[] { queueHardLimit, priority, conf };
this.queueClass = pluggableQueueClass.get();
}
} else {
this.name += ".Fifo";
this.queueInitArgs = new Object[] { maxQueueLength };
this.queueInitArgs = new Object[] { queueHardLimit };
this.queueClass = LinkedBlockingQueue.class;
}

Expand Down Expand Up @@ -231,20 +237,7 @@ public Map<String, Long> getCallQueueSizeSummary() {
.collect(Collectors.groupingBy(Pair::getFirst, Collectors.summingLong(Pair::getSecond)));
}

// This method can only be called ONCE per executor instance.
// Before calling: queueInitArgs[0] contains the soft limit (desired queue capacity)
// After calling: queueInitArgs[0] is set to hard limit and currentQueueLimit stores the original
// soft limit.
// Multiple calls would incorrectly use the hard limit as the soft limit.
// As all the queues has same initArgs and queueClass, there should be no need to call this again.
protected void initializeQueues(final int numQueues) {
if (!queues.isEmpty()) {
throw new RuntimeException("Queues are already initialized");
}
if (queueInitArgs.length > 0) {
currentQueueLimit = (int) queueInitArgs[0];
queueInitArgs[0] = Math.max((int) queueInitArgs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
}
for (int i = 0; i < numQueues; ++i) {
queues.add(ReflectionUtils.newInstance(queueClass, queueInitArgs));
}
Expand Down Expand Up @@ -464,7 +457,15 @@ public void resizeQueues(Configuration conf) {
}
}
final int queueLimit = currentQueueLimit;
currentQueueLimit = conf.getInt(configKey, queueLimit);
int newQueueLimit = conf.getInt(configKey, queueLimit);
if (newQueueLimit > queueHardLimit) {
LOG.warn(
"Requested soft limit {} exceeds queue hard limit/capacity {}. "
+ "A region server restart is required to grow the underlying queue.",
newQueueLimit, queueHardLimit);
newQueueLimit = currentQueueLimit;
}
currentQueueLimit = newQueueLimit;
}

public void onConfigurationChange(Configuration conf) {
Expand All @@ -477,8 +478,10 @@ public void onConfigurationChange(Configuration conf) {

for (BlockingQueue<CallRunner> queue : queues) {
if (queue instanceof AdaptiveLifoCoDelCallQueue) {
// current queue Limit for executor is already updated as part of resizeQueues, we need to
// let codel queue also make aware of it
((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay, codelInterval,
codelLifoThreshold);
codelLifoThreshold, currentQueueLimit);
} else if (queue instanceof ConfigurationObserver) {
((ConfigurationObserver) queue).onConfigurationChange(conf);
}
Expand Down
Loading