Fixes for unused EditSessions clogging queue

This commit is contained in:
Jesse Boyd 2016-06-06 10:58:13 +10:00
parent c1b22fbb21
commit 62e40bd9b6
6 changed files with 56 additions and 45 deletions

View File

@ -192,7 +192,7 @@ public class FaweBukkit implements IFawe, Listener {
try { try {
return plugin.getQueue(world); return plugin.getQueue(world);
} catch (Throwable ignore) { } catch (Throwable ignore) {
ignore.printStackTrace(); // ignore.printStackTrace();
} }
// Disable incompatible settings // Disable incompatible settings
Settings.PARALLEL_THREADS = 1; // BukkitAPI placer is too slow to parallel thread at the chunk level Settings.PARALLEL_THREADS = 1; // BukkitAPI placer is too slow to parallel thread at the chunk level

View File

@ -16,7 +16,7 @@ import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ConcurrentLinkedDeque;
public abstract class MappedFaweQueue<WORLD, CHUNK, SECTION> extends FaweQueue { public abstract class MappedFaweQueue<WORLD, CHUNK, SECTION> extends FaweQueue {
@ -26,7 +26,7 @@ public abstract class MappedFaweQueue<WORLD, CHUNK, SECTION> extends FaweQueue {
* Map of chunks in the queue * Map of chunks in the queue
*/ */
private ConcurrentHashMap<Long, FaweChunk> blocks = new ConcurrentHashMap<>(); private ConcurrentHashMap<Long, FaweChunk> blocks = new ConcurrentHashMap<>();
private LinkedBlockingDeque<FaweChunk> chunks = new LinkedBlockingDeque<FaweChunk>() { private ConcurrentLinkedDeque<FaweChunk> chunks = new ConcurrentLinkedDeque<FaweChunk>() {
@Override @Override
public boolean add(FaweChunk o) { public boolean add(FaweChunk o) {
if (getProgressTask() != null) { if (getProgressTask() != null) {
@ -297,7 +297,7 @@ public abstract class MappedFaweQueue<WORLD, CHUNK, SECTION> extends FaweQueue {
return chunks.size(); return chunks.size();
} }
private LinkedBlockingDeque<FaweChunk> toUpdate = new LinkedBlockingDeque<>(); private ConcurrentLinkedDeque<FaweChunk> toUpdate = new ConcurrentLinkedDeque<>();
private int dispatched = 0; private int dispatched = 0;

View File

@ -14,13 +14,13 @@ import com.sk89q.worldedit.world.biome.BaseBiome;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
public abstract class FaweQueue { public abstract class FaweQueue {
private final String world; private final String world;
private LinkedBlockingDeque<EditSession> sessions; private ConcurrentLinkedDeque<EditSession> sessions;
private long modified = System.currentTimeMillis(); private long modified = System.currentTimeMillis();
private RunnableVal2<FaweChunk, FaweChunk> changeTask; private RunnableVal2<FaweChunk, FaweChunk> changeTask;
private RunnableVal2<ProgressType, Integer> progressTask; private RunnableVal2<ProgressType, Integer> progressTask;
@ -47,7 +47,7 @@ public abstract class FaweQueue {
return; return;
} }
if (this.getSessions() == null) { if (this.getSessions() == null) {
setSessions(new LinkedBlockingDeque<EditSession>()); setSessions(new ConcurrentLinkedDeque<EditSession>());
} }
getSessions().add(session); getSessions().add(session);
} }
@ -70,11 +70,11 @@ public abstract class FaweQueue {
return getSessions() == null ? new HashSet<EditSession>() : new HashSet<>(getSessions()); return getSessions() == null ? new HashSet<EditSession>() : new HashSet<>(getSessions());
} }
public LinkedBlockingDeque<EditSession> getSessions() { public ConcurrentLinkedDeque<EditSession> getSessions() {
return sessions; return sessions;
} }
public void setSessions(LinkedBlockingDeque<EditSession> sessions) { public void setSessions(ConcurrentLinkedDeque<EditSession> sessions) {
this.sessions = sessions; this.sessions = sessions;
} }
@ -215,4 +215,8 @@ public abstract class FaweQueue {
public void enqueue() { public void enqueue() {
SetQueue.IMP.enqueue(this); SetQueue.IMP.enqueue(this);
} }
public void dequeue() {
SetQueue.IMP.dequeue(this);
}
} }

View File

@ -9,7 +9,7 @@ import com.sk89q.worldedit.EditSession;
import com.sk89q.worldedit.world.biome.BaseBiome; import com.sk89q.worldedit.world.biome.BaseBiome;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ConcurrentLinkedDeque;
public class DelegateFaweQueue extends FaweQueue { public class DelegateFaweQueue extends FaweQueue {
private final FaweQueue parent; private final FaweQueue parent;
@ -44,12 +44,12 @@ public class DelegateFaweQueue extends FaweQueue {
} }
@Override @Override
public LinkedBlockingDeque<EditSession> getSessions() { public ConcurrentLinkedDeque<EditSession> getSessions() {
return parent.getSessions(); return parent.getSessions();
} }
@Override @Override
public void setSessions(LinkedBlockingDeque<EditSession> sessions) { public void setSessions(ConcurrentLinkedDeque<EditSession> sessions) {
parent.setSessions(sessions); parent.setSessions(sessions);
} }

View File

@ -6,8 +6,10 @@ import com.boydti.fawe.object.FaweChunk;
import com.boydti.fawe.object.FaweQueue; import com.boydti.fawe.object.FaweQueue;
import com.boydti.fawe.object.RunnableVal2; import com.boydti.fawe.object.RunnableVal2;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ConcurrentLinkedDeque;
public class SetQueue { public class SetQueue {
@ -20,8 +22,8 @@ public class SetQueue {
INACTIVE, ACTIVE, NONE; INACTIVE, ACTIVE, NONE;
} }
public final LinkedBlockingDeque<FaweQueue> activeQueues; public final ConcurrentLinkedDeque<FaweQueue> activeQueues;
public final LinkedBlockingDeque<FaweQueue> inactiveQueues; public final ConcurrentLinkedDeque<FaweQueue> inactiveQueues;
/** /**
* Used to calculate elapsed time in milliseconds and ensure block placement doesn't lag the server * Used to calculate elapsed time in milliseconds and ensure block placement doesn't lag the server
@ -33,7 +35,7 @@ public class SetQueue {
/** /**
* A queue of tasks that will run when the queue is empty * A queue of tasks that will run when the queue is empty
*/ */
private final LinkedBlockingDeque<Runnable> runnables = new LinkedBlockingDeque<>(); private final ConcurrentLinkedDeque<Runnable> runnables = new ConcurrentLinkedDeque<>();
private final RunnableVal2<Long, FaweQueue> SET_TASK = new RunnableVal2<Long, FaweQueue>() { private final RunnableVal2<Long, FaweQueue> SET_TASK = new RunnableVal2<Long, FaweQueue>() {
@Override @Override
@ -52,12 +54,12 @@ public class SetQueue {
}; };
public SetQueue() { public SetQueue() {
activeQueues = new LinkedBlockingDeque(); activeQueues = new ConcurrentLinkedDeque();
inactiveQueues = new LinkedBlockingDeque<>(); inactiveQueues = new ConcurrentLinkedDeque<>();
TaskManager.IMP.repeat(new Runnable() { TaskManager.IMP.repeat(new Runnable() {
@Override @Override
public void run() { public void run() {
if (inactiveQueues.size() == 0 && activeQueues.size() == 0) { if (inactiveQueues.isEmpty() && activeQueues.isEmpty()) {
lastSuccess = System.currentTimeMillis(); lastSuccess = System.currentTimeMillis();
tasks(); tasks();
return; return;
@ -215,43 +217,46 @@ public class SetQueue {
} }
public FaweQueue getNextQueue() { public FaweQueue getNextQueue() {
long now = System.currentTimeMillis();
while (activeQueues.size() > 0) { while (activeQueues.size() > 0) {
FaweQueue queue = activeQueues.peek(); FaweQueue queue = activeQueues.peek();
if (queue != null && queue.size() > 0) { if (queue != null && queue.size() > 0) {
queue.setModified(System.currentTimeMillis()); queue.setModified(now);
return queue; return queue;
} else { } else {
activeQueues.poll(); activeQueues.poll();
} }
} }
if (inactiveQueues.size() > 0) { int size = inactiveQueues.size();
ArrayList<FaweQueue> tmp = new ArrayList<>(inactiveQueues); if (size > 0) {
if (Settings.QUEUE_MAX_WAIT >= 0) { Iterator<FaweQueue> iter = inactiveQueues.iterator();
long now = System.currentTimeMillis(); try {
if (lastSuccess != 0) {
for (FaweQueue queue : tmp) {
if (queue != null && queue.size() > 0 && now - queue.getModified() > Settings.QUEUE_MAX_WAIT) {
queue.setModified(now);
return queue;
} else if (now - queue.getModified() > Settings.QUEUE_DISCARD_AFTER) {
inactiveQueues.remove(queue);
}
}
}
}
if (Settings.QUEUE_SIZE != -1) {
int total = 0; int total = 0;
for (FaweQueue queue : tmp) { FaweQueue firstNonEmpty = null;
while (iter.hasNext()) {
FaweQueue queue = iter.next();
long age = now - queue.getModified();
total += queue.size(); total += queue.size();
if (queue.size() == 0) {
if (age > Settings.QUEUE_DISCARD_AFTER) {
iter.remove();
}
continue;
}
if (firstNonEmpty == null) {
firstNonEmpty = queue;
} }
if (total > Settings.QUEUE_SIZE) { if (total > Settings.QUEUE_SIZE) {
for (FaweQueue queue : tmp) { firstNonEmpty.setModified(now);
if (queue != null && queue.size() > 0) { return firstNonEmpty;
queue.setModified(System.currentTimeMillis()); }
if (age > Settings.QUEUE_MAX_WAIT) {
queue.setModified(now);
return queue; return queue;
} }
} }
} } catch (ConcurrentModificationException e) {
e.printStackTrace();
} }
} }
return null; return null;
@ -332,10 +337,10 @@ public class SetQueue {
} }
public synchronized boolean tasks() { public synchronized boolean tasks() {
if (this.runnables.size() == 0) { if (this.runnables.isEmpty()) {
return false; return false;
} }
final LinkedBlockingDeque<Runnable> tmp = new LinkedBlockingDeque<>(this.runnables); final ConcurrentLinkedDeque<Runnable> tmp = new ConcurrentLinkedDeque<>(this.runnables);
this.runnables.clear(); this.runnables.clear();
for (final Runnable runnable : tmp) { for (final Runnable runnable : tmp) {
runnable.run(); runnable.run();

View File

@ -970,6 +970,8 @@ public class EditSession implements Extent {
// Enqueue it // Enqueue it
if (queue != null && queue.size() > 0) { if (queue != null && queue.size() > 0) {
queue.enqueue(); queue.enqueue();
} else {
queue.dequeue();
} }
if (changeSet != null) { if (changeSet != null) {
if (Settings.COMBINE_HISTORY_STAGE && queue.size() > 0) { if (Settings.COMBINE_HISTORY_STAGE && queue.size() > 0) {