From 729f4f03b190546f3e3de4cd5abf38bd8cd50409 Mon Sep 17 00:00:00 2001 From: Poweruser Date: Sat, 28 May 2016 21:51:43 +0200 Subject: [PATCH] Combine different CachedThreadPools into one diff --git a/src/main/java/net/frozenorb/NamePriorityThreadFactory.java b/src/main/java/net/frozenorb/NamePriorityThreadFactory.java index e0a6e76ed..bc084b61d 100644 --- a/src/main/java/net/frozenorb/NamePriorityThreadFactory.java +++ b/src/main/java/net/frozenorb/NamePriorityThreadFactory.java @@ -1,5 +1,9 @@ package net.frozenorb; +import java.lang.ref.WeakReference; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -8,29 +12,36 @@ public class NamePriorityThreadFactory implements ThreadFactory { private int idCounter = 0; private String name = "mSpigotThread"; private boolean isDaemon = false; + private Queue> createdThreadList; public NamePriorityThreadFactory(int priority) { this.priority = Math.min(Math.max(priority, Thread.MIN_PRIORITY), Thread.MAX_PRIORITY); } - public NamePriorityThreadFactory(int priority, boolean daemon) { + public NamePriorityThreadFactory(int priority, String name) { this(priority); - this.isDaemon = daemon; + this.name = name; } - public NamePriorityThreadFactory(int priority, String name) { - this(priority); + public NamePriorityThreadFactory(String name) { + this(Thread.NORM_PRIORITY); this.name = name; } - public NamePriorityThreadFactory(int priority, String name, boolean daemon) { - this(priority, name); + public NamePriorityThreadFactory setDaemon(boolean daemon) { this.isDaemon = daemon; + return this; } - public NamePriorityThreadFactory(String name) { - this(Thread.NORM_PRIORITY); - this.name = name; + public NamePriorityThreadFactory setLogThreads(boolean log) { + if(log) { + if(this.createdThreadList == null) { + this.createdThreadList = new ConcurrentLinkedQueue>(); + } + } else { + this.createdThreadList = null; + } + return this; } @Override @@ -39,7 +50,32 @@ public class NamePriorityThreadFactory implements ThreadFactory { thread.setPriority(this.priority); thread.setName(this.name + "-" + String.valueOf(idCounter)); thread.setDaemon(this.isDaemon); + if(this.createdThreadList != null) { + this.createdThreadList.add(new WeakReference(thread)); + } idCounter++; return thread; } + + public int getActiveCount() { + if(this.createdThreadList != null) { + Iterator> iter = this.createdThreadList.iterator(); + int count = 0; + while(iter.hasNext()) { + WeakReference ref = iter.next(); + Thread t = ref.get(); + if(t == null) { + iter.remove(); + } else if(t.isAlive()) { + count++; + } + } + return count; + } + return -1; + } + + public Queue> getThreadList() { + return this.createdThreadList; + } } diff --git a/src/main/java/net/frozenorb/ThreadingManager.java b/src/main/java/net/frozenorb/ThreadingManager.java index 0ee93dbf0..e0b3c38f2 100644 --- a/src/main/java/net/frozenorb/ThreadingManager.java +++ b/src/main/java/net/frozenorb/ThreadingManager.java @@ -4,10 +4,17 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.lang.ref.WeakReference; import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Queue; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -21,42 +28,81 @@ import net.minecraft.server.NBTTagCompound; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.spigotmc.SpigotConfig; public class ThreadingManager { private final Logger log = LogManager.getLogger(); - private ExecutorService nbtFileService = Executors.newSingleThreadExecutor(new NamePriorityThreadFactory(Thread.NORM_PRIORITY - 2, "mSpigot_NBTFileSaver")); private static ThreadingManager instance; private PathSearchThrottlerThread pathSearchThrottler; private ScheduledExecutorService timerService = Executors.newScheduledThreadPool(1, new NamePriorityThreadFactory(Thread.NORM_PRIORITY + 2, "mSpigot_TimerService")); private TickCounter tickCounter = new TickCounter(); + private NamePriorityThreadFactory cachedThreadPoolFactory; + private ExecutorService cachedThreadPool; private ScheduledFuture tickTimerTask; private TickTimer tickTimerObject; private static int timerDelay = 45; + private TaskQueueWorker nbtFiles; + private TaskQueueWorker headConversions; + public ThreadingManager() { instance = this; this.pathSearchThrottler = new PathSearchThrottlerThread(2); this.timerService.scheduleAtFixedRate(this.tickCounter, 1, 1000, TimeUnit.MILLISECONDS); this.tickTimerObject = new TickTimer(); + this.cachedThreadPoolFactory = new NamePriorityThreadFactory(Thread.currentThread().getPriority() - 1, "mSpigot_Async-Executor").setLogThreads(true).setDaemon(true); + this.cachedThreadPool = Executors.newCachedThreadPool(this.cachedThreadPoolFactory); + this.nbtFiles = this.createTaskQueueWorker(); + this.headConversions = this.createTaskQueueWorker(); } public void shutdown() { this.pathSearchThrottler.shutdown(); - this.nbtFileService.shutdown(); this.timerService.shutdown(); - while(!this.nbtFileService.isTerminated()) { + this.cachedThreadPool.shutdown(); + while((this.nbtFiles.isActive()) && !this.cachedThreadPool.isTerminated()) { try { - if(!this.nbtFileService.awaitTermination(3, TimeUnit.MINUTES)) { - log.warn("mSpigot is still waiting for NBT Files to be saved."); - } + this.cachedThreadPool.awaitTermination(10, TimeUnit.SECONDS); + log.warn("mSpigot is still waiting for NBT files to be written to disk. " + this.nbtFiles.getTaskCount() + " to go..."); } catch(InterruptedException e) {} } + if(!this.cachedThreadPool.isTerminated()) { + this.cachedThreadPool.shutdownNow(); + try { + this.cachedThreadPool.awaitTermination(10L, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if(SpigotConfig.logRemainingAsyncThreadsDuringShutdown && this.cachedThreadPoolFactory.getActiveCount() > 0) { + log.warn("mSpigot is still waiting for " + this.cachedThreadPoolFactory.getActiveCount() + " async threads to finish."); + Queue> queue = this.cachedThreadPoolFactory.getThreadList(); + Iterator> iter = null; + if(queue != null) { + System.out.println("== List of async threads that did not terminate on their own == "); + iter = queue.iterator(); + } + while(iter != null && iter.hasNext()) { + WeakReference ref = iter.next(); + Thread t = ref.get(); + if(t == null) { + iter.remove(); + } else if (t.isAlive()) { + StackTraceElement[] e = t.getStackTrace(); + System.out.println(t.getName() + " - " + t.getState().toString()); + for(StackTraceElement et: e) { + System.out.println(et.toString()); + } + System.out.println("========================== "); + } + } + } + } } public static void saveNBTPlayerDataStatic(PlayerDataSaveJob savejob) { - instance.nbtFileService.execute(savejob); + instance.nbtFiles.queueTask(savejob); } public static void saveNBTFileStatic(NBTTagCompound compound, File file) { @@ -64,7 +110,7 @@ public class ThreadingManager { } public void saveNBTFile(NBTTagCompound compound, File file) { - this.nbtFileService.execute(new NBTFileSaver(compound, file)); + this.nbtFiles.queueTask(new NBTFileSaver(compound, file)); } private class NBTFileSaver implements Runnable { @@ -176,4 +222,79 @@ public class ThreadingManager { public static void addWorldStatsTask(WorldStatsTask task) { instance.timerService.schedule(task, 2, TimeUnit.SECONDS); } + + public static void execute(Runnable runnable) { + instance.cachedThreadPool.execute(runnable); + } + + public static Future submit(Runnable runnable) { + return instance.cachedThreadPool.submit(runnable); + } + + public static Future submit(Callable callable) { + return instance.cachedThreadPool.submit(callable); + } + + public static void queueHeadConversion(Runnable runnable) { + instance.headConversions.queueTask(runnable); + } + + public static TaskQueueWorker createTaskQueue() { + return instance.createTaskQueueWorker(); + } + + public TaskQueueWorker createTaskQueueWorker() { + return new TaskQueueWorker(this.cachedThreadPool); + } + + public class TaskQueueWorker implements Runnable { + + private ConcurrentLinkedDeque taskQueue = new ConcurrentLinkedDeque(); + private ExecutorService service; + private volatile boolean isActive = false; + + public TaskQueueWorker(ExecutorService service) { + this.service = service; + } + + @Override + public void run() { + Runnable task = null; + while(this.isActive = ((task = this.taskQueue.pollFirst()) != null)) { + try { + task.run(); + } catch (Exception e) { + log.error("Thread " + Thread.currentThread().getName() + " encountered an exception: " + e.getMessage(), e); + } + } + } + + public void queueTask(Runnable runnable) { + this.taskQueue.addLast(runnable); + if(!this.isActive) { + this.isActive = true; + this.service.execute(this); + } + } + + public boolean isActive() { + if(!this.isActive && !this.taskQueue.isEmpty()) { + this.isActive = true; + this.service.execute(this); + } + return this.isActive; + } + + public int getTaskCount() { + int count = this.taskQueue.size(); + if(this.isActive) { + count++; + } + return count; + } + } + + public static Executor getCommonThreadPool() { + return instance.cachedThreadPool; + } } diff --git a/src/main/java/net/minecraft/server/ItemStack.java b/src/main/java/net/minecraft/server/ItemStack.java index 5051a3f7d..ac8e05f89 100644 --- a/src/main/java/net/minecraft/server/ItemStack.java +++ b/src/main/java/net/minecraft/server/ItemStack.java @@ -18,6 +18,8 @@ import org.bukkit.entity.Player; import org.bukkit.event.world.StructureGrowEvent; // CraftBukkit end +import net.frozenorb.ThreadingManager; // Poweruser + public final class ItemStack { public static final DecimalFormat a = new DecimalFormat("#.###"); @@ -246,7 +248,7 @@ public final class ItemStack { } final String finalOwner = owner; - TileEntitySkull.executor.execute( new Runnable() + ThreadingManager.queueHeadConversion(new Runnable() // Poweruser { @Override public void run() diff --git a/src/main/java/net/minecraft/server/LoginListener.java b/src/main/java/net/minecraft/server/LoginListener.java index 8d24d4563..d88181373 100644 --- a/src/main/java/net/minecraft/server/LoginListener.java +++ b/src/main/java/net/minecraft/server/LoginListener.java @@ -16,6 +16,14 @@ import net.minecraft.util.org.apache.commons.lang3.Validate; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +// Pweruser start +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import net.frozenorb.NamePriorityThreadFactory; +import net.frozenorb.ThreadingManager; +// Poweruser end + public class LoginListener implements PacketLoginInListener { private static final AtomicInteger b = new AtomicInteger(0); @@ -140,7 +148,7 @@ public class LoginListener implements PacketLoginInListener { this.g = EnumProtocolState.KEY; this.networkManager.handle(new PacketLoginOutEncryptionBegin(this.j, this.server.K().getPublic(), this.e), new GenericFutureListener[0]); } else { - (new ThreadPlayerLookupUUID(this, "User Authenticator #" + b.incrementAndGet())).start(); // Spigot + ThreadingManager.execute(new ThreadPlayerLookupUUID(this)); // Poweruser } } @@ -154,7 +162,7 @@ public class LoginListener implements PacketLoginInListener { this.loginKey = packetlogininencryptionbegin.a(privatekey); this.g = EnumProtocolState.AUTHENTICATING; this.networkManager.a(this.loginKey); - (new ThreadPlayerLookupUUID(this, "User Authenticator #" + b.incrementAndGet())).start(); + ThreadingManager.execute(new ThreadPlayerLookupUUID(this)); // Poweruser } } diff --git a/src/main/java/net/minecraft/server/PacketPlayInChat.java b/src/main/java/net/minecraft/server/PacketPlayInChat.java index d419f0f71..6146e7a70 100644 --- a/src/main/java/net/minecraft/server/PacketPlayInChat.java +++ b/src/main/java/net/minecraft/server/PacketPlayInChat.java @@ -2,6 +2,8 @@ package net.minecraft.server; import java.io.IOException; // CraftBukkit +import net.frozenorb.ThreadingManager; // Poweruser + public class PacketPlayInChat extends Packet { private String message; @@ -44,13 +46,11 @@ public class PacketPlayInChat extends Packet { // CraftBukkit end // Spigot Start - private static final java.util.concurrent.ExecutorService executors = java.util.concurrent.Executors.newCachedThreadPool( - new com.google.common.util.concurrent.ThreadFactoryBuilder().setDaemon( true ).setNameFormat( "Async Chat Thread - #%d" ).build() ); public void handle(final PacketListener packetlistener) { if ( a() ) { - executors.submit( new Runnable() + ThreadingManager.submit( new Runnable() // Poweruser { @Override diff --git a/src/main/java/net/minecraft/server/ThreadPlayerLookupUUID.java b/src/main/java/net/minecraft/server/ThreadPlayerLookupUUID.java index fca32adf9..a28c9f6fb 100644 --- a/src/main/java/net/minecraft/server/ThreadPlayerLookupUUID.java +++ b/src/main/java/net/minecraft/server/ThreadPlayerLookupUUID.java @@ -12,12 +12,11 @@ import org.bukkit.event.player.AsyncPlayerPreLoginEvent; import org.bukkit.event.player.PlayerPreLoginEvent; // CraftBukkit end -class ThreadPlayerLookupUUID extends Thread { +class ThreadPlayerLookupUUID implements Runnable { // Poweruser final LoginListener a; - ThreadPlayerLookupUUID(LoginListener loginlistener, String s) { - super(s); + ThreadPlayerLookupUUID(LoginListener loginlistener) { // Poweruser this.a = loginlistener; } diff --git a/src/main/java/net/minecraft/server/TileEntitySkull.java b/src/main/java/net/minecraft/server/TileEntitySkull.java index 1796a56fc..227d345e9 100644 --- a/src/main/java/net/minecraft/server/TileEntitySkull.java +++ b/src/main/java/net/minecraft/server/TileEntitySkull.java @@ -18,17 +18,14 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import net.minecraft.util.com.mojang.authlib.Agent; // Spigot end +import net.frozenorb.ThreadingManager; // Poweruser + public class TileEntitySkull extends TileEntity { private int a; private int i; private GameProfile j = null; // Spigot start - public static final Executor executor = Executors.newFixedThreadPool(3, - new ThreadFactoryBuilder() - .setNameFormat("Head Conversion Thread - %1$d") - .build() - ); public static final Cache skinCache = CacheBuilder.newBuilder() .maximumSize( 5000 ) .expireAfterAccess( 60, TimeUnit.MINUTES ) @@ -122,7 +119,7 @@ public class TileEntitySkull extends TileEntity { // Spigot start - Handle async final String name = this.j.getName(); setSkullType( 0 ); // Work around a client bug - executor.execute(new Runnable() { + ThreadingManager.queueHeadConversion(new Runnable() { // Poweruser @Override public void run() { diff --git a/src/main/java/net/minecraft/server/World.java b/src/main/java/net/minecraft/server/World.java index 3b42cbecc..609fe32ae 100644 --- a/src/main/java/net/minecraft/server/World.java +++ b/src/main/java/net/minecraft/server/World.java @@ -40,6 +40,8 @@ import org.bukkit.event.weather.ThunderChangeEvent; // Poweruser start import net.frozenorb.LightingUpdater; import net.frozenorb.WeakChunkCache; +import net.frozenorb.ThreadingManager; +import net.frozenorb.ThreadingManager.TaskQueueWorker; // Poweruser end public abstract class World implements IBlockAccess { @@ -128,16 +130,9 @@ public abstract class World implements IBlockAccess { public static boolean haveWeSilencedAPhysicsCrash; public static String blockLocation; public List triggerHoppersList = new ArrayList(); // Spigot, When altHopperTicking, tile entities being added go through here. - // Poweruser start - only one thread, and with lower priority. Instead of one for each world - private static ExecutorService lightingExecutor; // PaperSpigot - Asynchronous lighting updates + // Poweruser start private LightingUpdater lightingUpdater = new LightingUpdater(); - - public static ExecutorService getLightingExecutor() { - if(lightingExecutor == null) { - lightingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setPriority(Thread.NORM_PRIORITY - 1).setNameFormat("PaperSpigot - Lighting Thread").build()); - } - return lightingExecutor; - } + private TaskQueueWorker lightingQueue = ThreadingManager.createTaskQueue(); // Poweruser end public final Map explosionDensityCache = new HashMap(); // PaperSpigot - Optimize explosions @@ -2672,7 +2667,7 @@ public abstract class World implements IBlockAccess { } // Poweruser start - getLightingExecutor().submit(new Runnable() { + this.lightingQueue.queueTask(new Runnable() { @Override public void run() { try { diff --git a/src/main/java/org/bukkit/craftbukkit/inventory/CraftMetaSkull.java b/src/main/java/org/bukkit/craftbukkit/inventory/CraftMetaSkull.java index e32bcb181..b4258807e 100644 --- a/src/main/java/org/bukkit/craftbukkit/inventory/CraftMetaSkull.java +++ b/src/main/java/org/bukkit/craftbukkit/inventory/CraftMetaSkull.java @@ -14,6 +14,8 @@ import org.bukkit.inventory.meta.SkullMeta; import com.google.common.collect.ImmutableMap.Builder; +import net.frozenorb.ThreadingManager; // Poweruser + @DelegateDeserialization(SerializableMeta.class) class CraftMetaSkull extends CraftMetaItem implements SkullMeta { static final ItemMetaKey SKULL_OWNER = new ItemMetaKey("SkullOwner", "skull-owner"); @@ -56,7 +58,7 @@ class CraftMetaSkull extends CraftMetaItem implements SkullMeta { // Spigot start - do an async lookup of the profile. // Unfortunately there is not way to refresh the holding // inventory, so that responsibility is left to the user. - net.minecraft.server.TileEntitySkull.executor.execute( new Runnable() + ThreadingManager.queueHeadConversion(new Runnable() // Poweruser { @Override public void run() diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java index 93d8d4248..361d39663 100644 --- a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java +++ b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java @@ -14,6 +14,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; +import net.frozenorb.ThreadingManager; // Poweruser + import org.apache.commons.lang.Validate; import org.bukkit.plugin.IllegalPluginAccessException; import org.bukkit.plugin.Plugin; @@ -72,7 +74,7 @@ public class CraftScheduler implements BukkitScheduler { */ private final ConcurrentHashMap runners = new ConcurrentHashMap(); private volatile int currentTick = -1; - private final Executor executor = Executors.newCachedThreadPool(new com.google.common.util.concurrent.ThreadFactoryBuilder().setNameFormat("Craft Scheduler Thread - %1$d").build()); // Spigot + private final Executor executor = ThreadingManager.getCommonThreadPool(); // Poweruser private CraftAsyncDebugger debugHead = new CraftAsyncDebugger(-1, null, null) {@Override StringBuilder debugTo(StringBuilder string) {return string;}}; private CraftAsyncDebugger debugTail = debugHead; private static final int RECENT_TICKS; diff --git a/src/main/java/org/spigotmc/SpigotConfig.java b/src/main/java/org/spigotmc/SpigotConfig.java index 50f16bee4..5bbe73ad3 100644 --- a/src/main/java/org/spigotmc/SpigotConfig.java +++ b/src/main/java/org/spigotmc/SpigotConfig.java @@ -408,6 +408,11 @@ public class SpigotConfig } } + public static boolean logRemainingAsyncThreadsDuringShutdown; + private static void logRemainingAsyncThreadsDuringShutdown() { + logRemainingAsyncThreadsDuringShutdown = getBoolean( "settings.logRemainingAsyncThreadsDuringShutdown" , true); + } + private static void powertpsCommand() { commands.put( "tps2", new net.frozenorb.command.TPSCommand( "tps2" ) ); -- 2.13.3