From 62e40bd9b67baf891aa22096c9cf6d77ae7d85f5 Mon Sep 17 00:00:00 2001 From: Jesse Boyd Date: Mon, 6 Jun 2016 10:58:13 +1000 Subject: [PATCH] Fixes for unused EditSessions clogging queue --- .../com/boydti/fawe/bukkit/FaweBukkit.java | 2 +- .../boydti/fawe/example/MappedFaweQueue.java | 6 +- .../com/boydti/fawe/object/FaweQueue.java | 14 ++-- .../boydti/fawe/util/DelegateFaweQueue.java | 6 +- .../java/com/boydti/fawe/util/SetQueue.java | 71 ++++++++++--------- .../java/com/sk89q/worldedit/EditSession.java | 2 + 6 files changed, 56 insertions(+), 45 deletions(-) diff --git a/bukkit0/src/main/java/com/boydti/fawe/bukkit/FaweBukkit.java b/bukkit0/src/main/java/com/boydti/fawe/bukkit/FaweBukkit.java index cb01fd96..73e9a359 100644 --- a/bukkit0/src/main/java/com/boydti/fawe/bukkit/FaweBukkit.java +++ b/bukkit0/src/main/java/com/boydti/fawe/bukkit/FaweBukkit.java @@ -192,7 +192,7 @@ public class FaweBukkit implements IFawe, Listener { try { return plugin.getQueue(world); } catch (Throwable ignore) { - ignore.printStackTrace(); +// ignore.printStackTrace(); } // Disable incompatible settings Settings.PARALLEL_THREADS = 1; // BukkitAPI placer is too slow to parallel thread at the chunk level diff --git a/core/src/main/java/com/boydti/fawe/example/MappedFaweQueue.java b/core/src/main/java/com/boydti/fawe/example/MappedFaweQueue.java index 495e29a4..68bb3b65 100644 --- a/core/src/main/java/com/boydti/fawe/example/MappedFaweQueue.java +++ b/core/src/main/java/com/boydti/fawe/example/MappedFaweQueue.java @@ -16,7 +16,7 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ConcurrentLinkedDeque; public abstract class MappedFaweQueue extends FaweQueue { @@ -26,7 +26,7 @@ public abstract class MappedFaweQueue extends FaweQueue { * Map of chunks in the queue */ private ConcurrentHashMap blocks = new ConcurrentHashMap<>(); - private LinkedBlockingDeque chunks = new LinkedBlockingDeque() { + private ConcurrentLinkedDeque chunks = new ConcurrentLinkedDeque() { @Override public boolean add(FaweChunk o) { if (getProgressTask() != null) { @@ -297,7 +297,7 @@ public abstract class MappedFaweQueue extends FaweQueue { return chunks.size(); } - private LinkedBlockingDeque toUpdate = new LinkedBlockingDeque<>(); + private ConcurrentLinkedDeque toUpdate = new ConcurrentLinkedDeque<>(); private int dispatched = 0; diff --git a/core/src/main/java/com/boydti/fawe/object/FaweQueue.java b/core/src/main/java/com/boydti/fawe/object/FaweQueue.java index f9318242..430a4c72 100644 --- a/core/src/main/java/com/boydti/fawe/object/FaweQueue.java +++ b/core/src/main/java/com/boydti/fawe/object/FaweQueue.java @@ -14,13 +14,13 @@ import com.sk89q.worldedit.world.biome.BaseBiome; import java.util.HashSet; import java.util.Set; import java.util.UUID; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicBoolean; public abstract class FaweQueue { private final String world; - private LinkedBlockingDeque sessions; + private ConcurrentLinkedDeque sessions; private long modified = System.currentTimeMillis(); private RunnableVal2 changeTask; private RunnableVal2 progressTask; @@ -47,7 +47,7 @@ public abstract class FaweQueue { return; } if (this.getSessions() == null) { - setSessions(new LinkedBlockingDeque()); + setSessions(new ConcurrentLinkedDeque()); } getSessions().add(session); } @@ -70,11 +70,11 @@ public abstract class FaweQueue { return getSessions() == null ? new HashSet() : new HashSet<>(getSessions()); } - public LinkedBlockingDeque getSessions() { + public ConcurrentLinkedDeque getSessions() { return sessions; } - public void setSessions(LinkedBlockingDeque sessions) { + public void setSessions(ConcurrentLinkedDeque sessions) { this.sessions = sessions; } @@ -215,4 +215,8 @@ public abstract class FaweQueue { public void enqueue() { SetQueue.IMP.enqueue(this); } + + public void dequeue() { + SetQueue.IMP.dequeue(this); + } } diff --git a/core/src/main/java/com/boydti/fawe/util/DelegateFaweQueue.java b/core/src/main/java/com/boydti/fawe/util/DelegateFaweQueue.java index 6732e5b8..65b38fa5 100644 --- a/core/src/main/java/com/boydti/fawe/util/DelegateFaweQueue.java +++ b/core/src/main/java/com/boydti/fawe/util/DelegateFaweQueue.java @@ -9,7 +9,7 @@ import com.sk89q.worldedit.EditSession; import com.sk89q.worldedit.world.biome.BaseBiome; import java.util.Set; import java.util.UUID; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ConcurrentLinkedDeque; public class DelegateFaweQueue extends FaweQueue { private final FaweQueue parent; @@ -44,12 +44,12 @@ public class DelegateFaweQueue extends FaweQueue { } @Override - public LinkedBlockingDeque getSessions() { + public ConcurrentLinkedDeque getSessions() { return parent.getSessions(); } @Override - public void setSessions(LinkedBlockingDeque sessions) { + public void setSessions(ConcurrentLinkedDeque sessions) { parent.setSessions(sessions); } diff --git a/core/src/main/java/com/boydti/fawe/util/SetQueue.java b/core/src/main/java/com/boydti/fawe/util/SetQueue.java index f86382c4..89a6019c 100644 --- a/core/src/main/java/com/boydti/fawe/util/SetQueue.java +++ b/core/src/main/java/com/boydti/fawe/util/SetQueue.java @@ -6,8 +6,10 @@ import com.boydti.fawe.object.FaweChunk; import com.boydti.fawe.object.FaweQueue; import com.boydti.fawe.object.RunnableVal2; import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.Iterator; import java.util.List; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ConcurrentLinkedDeque; public class SetQueue { @@ -20,8 +22,8 @@ public class SetQueue { INACTIVE, ACTIVE, NONE; } - public final LinkedBlockingDeque activeQueues; - public final LinkedBlockingDeque inactiveQueues; + public final ConcurrentLinkedDeque activeQueues; + public final ConcurrentLinkedDeque inactiveQueues; /** * Used to calculate elapsed time in milliseconds and ensure block placement doesn't lag the server @@ -33,7 +35,7 @@ public class SetQueue { /** * A queue of tasks that will run when the queue is empty */ - private final LinkedBlockingDeque runnables = new LinkedBlockingDeque<>(); + private final ConcurrentLinkedDeque runnables = new ConcurrentLinkedDeque<>(); private final RunnableVal2 SET_TASK = new RunnableVal2() { @Override @@ -52,12 +54,12 @@ public class SetQueue { }; public SetQueue() { - activeQueues = new LinkedBlockingDeque(); - inactiveQueues = new LinkedBlockingDeque<>(); + activeQueues = new ConcurrentLinkedDeque(); + inactiveQueues = new ConcurrentLinkedDeque<>(); TaskManager.IMP.repeat(new Runnable() { @Override public void run() { - if (inactiveQueues.size() == 0 && activeQueues.size() == 0) { + if (inactiveQueues.isEmpty() && activeQueues.isEmpty()) { lastSuccess = System.currentTimeMillis(); tasks(); return; @@ -215,43 +217,46 @@ public class SetQueue { } public FaweQueue getNextQueue() { + long now = System.currentTimeMillis(); while (activeQueues.size() > 0) { FaweQueue queue = activeQueues.peek(); if (queue != null && queue.size() > 0) { - queue.setModified(System.currentTimeMillis()); + queue.setModified(now); return queue; } else { activeQueues.poll(); } } - if (inactiveQueues.size() > 0) { - ArrayList tmp = new ArrayList<>(inactiveQueues); - if (Settings.QUEUE_MAX_WAIT >= 0) { - long now = System.currentTimeMillis(); - if (lastSuccess != 0) { - for (FaweQueue queue : tmp) { - if (queue != null && queue.size() > 0 && now - queue.getModified() > Settings.QUEUE_MAX_WAIT) { - queue.setModified(now); - return queue; - } else if (now - queue.getModified() > Settings.QUEUE_DISCARD_AFTER) { - inactiveQueues.remove(queue); - } - } - } - } - if (Settings.QUEUE_SIZE != -1) { + int size = inactiveQueues.size(); + if (size > 0) { + Iterator iter = inactiveQueues.iterator(); + try { int total = 0; - for (FaweQueue queue : tmp) { + FaweQueue firstNonEmpty = null; + while (iter.hasNext()) { + FaweQueue queue = iter.next(); + long age = now - queue.getModified(); total += queue.size(); - } - if (total > Settings.QUEUE_SIZE) { - for (FaweQueue queue : tmp) { - if (queue != null && queue.size() > 0) { - queue.setModified(System.currentTimeMillis()); - return queue; + if (queue.size() == 0) { + if (age > Settings.QUEUE_DISCARD_AFTER) { + iter.remove(); } + continue; + } + if (firstNonEmpty == null) { + firstNonEmpty = queue; + } + if (total > Settings.QUEUE_SIZE) { + firstNonEmpty.setModified(now); + return firstNonEmpty; + } + if (age > Settings.QUEUE_MAX_WAIT) { + queue.setModified(now); + return queue; } } + } catch (ConcurrentModificationException e) { + e.printStackTrace(); } } return null; @@ -332,10 +337,10 @@ public class SetQueue { } public synchronized boolean tasks() { - if (this.runnables.size() == 0) { + if (this.runnables.isEmpty()) { return false; } - final LinkedBlockingDeque tmp = new LinkedBlockingDeque<>(this.runnables); + final ConcurrentLinkedDeque tmp = new ConcurrentLinkedDeque<>(this.runnables); this.runnables.clear(); for (final Runnable runnable : tmp) { runnable.run(); diff --git a/core/src/main/java/com/sk89q/worldedit/EditSession.java b/core/src/main/java/com/sk89q/worldedit/EditSession.java index 1ed33661..4223918e 100644 --- a/core/src/main/java/com/sk89q/worldedit/EditSession.java +++ b/core/src/main/java/com/sk89q/worldedit/EditSession.java @@ -970,6 +970,8 @@ public class EditSession implements Extent { // Enqueue it if (queue != null && queue.size() > 0) { queue.enqueue(); + } else { + queue.dequeue(); } if (changeSet != null) { if (Settings.COMBINE_HISTORY_STAGE && queue.size() > 0) {