Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,12 @@ public ExecutorManager(int workerThreadCount, Consumer<Thread> threadInitializer
boolean tryLock(Task task) {
retry:
while (true) {
final FreeableTaskList listenerSet = new FreeableTaskList();
LockToken[] lockTokens = task.lockTokens();
if (lockTokens.length == 0) {
return true;
}

final FreeableTaskList listenerSet = new FreeableTaskList();
for (int i = 0; i < lockTokens.length; i++) {
LockToken token = lockTokens[i];
final FreeableTaskList present = this.lockListeners.putIfAbsent(token, listenerSet);
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/ishland/flowsched/executor/SimpleTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Objects;

public class SimpleTask extends Task {
private static final LockToken[] EMPTY_LOCK_TOKENS = new LockToken[0];

private final Runnable wrapped;

Expand All @@ -26,6 +27,6 @@ public void propagateException(Throwable t) {

@Override
public LockToken[] lockTokens() {
return new LockToken[0];
return EMPTY_LOCK_TOKENS;
}
}
35 changes: 22 additions & 13 deletions src/main/java/com/ishland/flowsched/scheduler/TicketSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;

import java.lang.invoke.VarHandle;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;

/**
Expand All @@ -21,15 +23,12 @@ public TicketSet(ItemStatus<K, V, Ctx> initialStatus, ObjectFactory objectFactor
this.targetStatus = initialStatus.ordinal();
ItemStatus<K, V, Ctx>[] allStatuses = initialStatus.getAllStatuses();
this.status2Tickets = new Set[allStatuses.length];
for (int i = 0; i < allStatuses.length; i++) {
this.status2Tickets[i] = new ObjectOpenHashSet<>(ObjectOpenHashSet.DEFAULT_INITIAL_SIZE, ObjectOpenHashSet.FAST_LOAD_FACTOR);
}
this.status2TicketsSize = new int[allStatuses.length];
VarHandle.fullFence();
}

public boolean checkAdd(ItemStatus<K, V, Ctx> targetStatus, ItemTicket ticket) {
final boolean added = this.status2Tickets[targetStatus.ordinal()].add(ticket);
final boolean added = this.getOrCreateTicketsForStatus(targetStatus.ordinal()).add(ticket);
return added;
}

Expand All @@ -41,8 +40,8 @@ public void addUnchecked(ItemStatus<K, V, Ctx> targetStatus) {
}

public boolean checkRemove(ItemStatus<K, V, Ctx> targetStatus, ItemTicket ticket) {
final boolean removed = this.status2Tickets[targetStatus.ordinal()].remove(ticket);
return removed;
final Set<ItemTicket> tickets = this.status2Tickets[targetStatus.ordinal()];
return tickets != null && tickets.remove(ticket);
}

public void removeUnchecked(ItemStatus<K, V, Ctx> targetStatus) {
Expand All @@ -61,30 +60,40 @@ public ItemStatus<K, V, Ctx> getTargetStatus() {
}

public Set<ItemTicket> getTicketsForStatus(ItemStatus<K, V, Ctx> status) {
return this.status2Tickets[status.ordinal()];
final Set<ItemTicket> tickets = this.status2Tickets[status.ordinal()];
return tickets != null ? tickets : Collections.emptySet();
}

void clear() {
for (Set<ItemTicket> tickets : status2Tickets) {
tickets.clear();
}
Arrays.fill(this.status2Tickets, null);
Arrays.fill(this.status2TicketsSize, 0);
this.targetStatus = this.initialStatus.ordinal();

VarHandle.fullFence();
}

void assertEmpty() {
for (Set<ItemTicket> tickets : status2Tickets) {
Assertions.assertTrue(tickets.isEmpty());
Assertions.assertTrue(tickets == null || tickets.isEmpty());
}
}

private int computeTargetStatusSlow() {
for (int i = this.status2Tickets.length - 1; i > 0; i--) {
for (int i = this.status2TicketsSize.length - 1; i > 0; i--) {
if (this.status2TicketsSize[i] > 0) {
return i;
}
}
return 0;
return this.initialStatus.ordinal();
}

private Set<ItemTicket> getOrCreateTicketsForStatus(int statusOrdinal) {
Set<ItemTicket> tickets = this.status2Tickets[statusOrdinal];
if (tickets == null) {
tickets = new ObjectOpenHashSet<>(ObjectOpenHashSet.DEFAULT_INITIAL_SIZE, ObjectOpenHashSet.FAST_LOAD_FACTOR);
this.status2Tickets[statusOrdinal] = tickets;
}
return tickets;
}

}
Loading