diff --git a/Plugins/Mineplex.ClansQueue.Common/src/com/mineplex/clansqueue/common/ClansQueueMessageBody.java b/Plugins/Mineplex.ClansQueue.Common/src/com/mineplex/clansqueue/common/ClansQueueMessageBody.java index 9a81b2c75..7c739f166 100644 --- a/Plugins/Mineplex.ClansQueue.Common/src/com/mineplex/clansqueue/common/ClansQueueMessageBody.java +++ b/Plugins/Mineplex.ClansQueue.Common/src/com/mineplex/clansqueue/common/ClansQueueMessageBody.java @@ -7,7 +7,6 @@ public abstract class ClansQueueMessageBody @Override public final String toString() { - super.toString(); return Utility.serialize(this); } } \ No newline at end of file diff --git a/Plugins/Mineplex.ClansQueue.Common/src/com/mineplex/clansqueue/common/EnclosedInteger.java b/Plugins/Mineplex.ClansQueue.Common/src/com/mineplex/clansqueue/common/EnclosedInteger.java new file mode 100644 index 000000000..8342fd33e --- /dev/null +++ b/Plugins/Mineplex.ClansQueue.Common/src/com/mineplex/clansqueue/common/EnclosedInteger.java @@ -0,0 +1,39 @@ +package com.mineplex.clansqueue.common; + +import javax.annotation.concurrent.NotThreadSafe; + +@NotThreadSafe +public class EnclosedInteger +{ + private int _value; + + public EnclosedInteger(int value) + { + _value = value; + } + + public EnclosedInteger() + { + this(0); + } + + public int get() + { + return _value; + } + + public int getAndIncrement() + { + return _value++; + } + + public int incrementAndGet() + { + return ++_value; + } + + public void set(int newValue) + { + _value = newValue; + } +} \ No newline at end of file diff --git a/Plugins/Mineplex.ClansQueue.Common/src/com/mineplex/clansqueue/common/QueueConstant.java b/Plugins/Mineplex.ClansQueue.Common/src/com/mineplex/clansqueue/common/QueueConstant.java index 241f40ea0..9be7e93e8 100644 --- a/Plugins/Mineplex.ClansQueue.Common/src/com/mineplex/clansqueue/common/QueueConstant.java +++ b/Plugins/Mineplex.ClansQueue.Common/src/com/mineplex/clansqueue/common/QueueConstant.java @@ -1,8 +1,12 @@ package com.mineplex.clansqueue.common; +import java.util.concurrent.TimeUnit; + public class QueueConstant { public static final String SERVICE_MESSENGER_IDENTIFIER = "Queue System"; public static final int BYPASS_QUEUE_WEIGHT = -1; public static final int MAX_TRANSFERS_PER_UPDATE = 5; + public static final int MAXIMUM_WEIGHT_FROM_INCREASE = 6; + public static final long TIME_TO_INCREASE_WEIGHT = TimeUnit.MINUTES.toMillis(10); } \ No newline at end of file diff --git a/Plugins/Mineplex.ClansQueue.Common/src/com/mineplex/clansqueue/common/SortableLinkedList.java b/Plugins/Mineplex.ClansQueue.Common/src/com/mineplex/clansqueue/common/SortableLinkedList.java index 63d4d1579..2b1bda0ad 100644 --- a/Plugins/Mineplex.ClansQueue.Common/src/com/mineplex/clansqueue/common/SortableLinkedList.java +++ b/Plugins/Mineplex.ClansQueue.Common/src/com/mineplex/clansqueue/common/SortableLinkedList.java @@ -3,10 +3,9 @@ package com.mineplex.clansqueue.common; import java.util.Comparator; import java.util.LinkedList; +@SuppressWarnings("serial") public class SortableLinkedList> extends LinkedList { - private static final long serialVersionUID = -1751886037436467545L; - public void sort() { sort(Comparator.naturalOrder()); diff --git a/Plugins/Mineplex.ClansQueue/src/com/mineplex/clansqueue/service/QueueService.java b/Plugins/Mineplex.ClansQueue/src/com/mineplex/clansqueue/service/QueueService.java index aea90d00d..433af8c19 100644 --- a/Plugins/Mineplex.ClansQueue/src/com/mineplex/clansqueue/service/QueueService.java +++ b/Plugins/Mineplex.ClansQueue/src/com/mineplex/clansqueue/service/QueueService.java @@ -4,7 +4,6 @@ import java.io.File; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import com.mineplex.clansqueue.common.ClansQueueMessenger; import com.mineplex.clansqueue.common.QueueConstant; @@ -31,7 +30,7 @@ public class QueueService } private final Region _region; - private final AtomicBoolean _running; + private boolean _running = false; private final Map _commandMap = Collections.synchronizedMap(new HashMap<>()); private final CommandSystem _commandSystem; private final ClansQueueManager _queueManager; @@ -46,17 +45,15 @@ public class QueueService { _region = Region.US; } - _running = new AtomicBoolean(); _commandSystem = new CommandSystem(this, _commandMap); _queueManager = new ClansQueueManager(this); } - private void start() + private synchronized void start() { System.out.println("[Queue Service] Enabling on region " + getRegion().name()); - _running.set(true); + _running = true; _commandSystem.start(); - _queueManager.start(); ClansQueueMessenger messenger = ClansQueueMessenger.getMessenger(QueueConstant.SERVICE_MESSENGER_IDENTIFIER); messenger.registerListener(ServerOnlineMessage.class, (online, origin) -> _queueManager.handleServerEnable(online.ServerName)); @@ -72,9 +69,9 @@ public class QueueService return _queueManager; } - public boolean isRunning() + public synchronized boolean isRunning() { - return _running.get(); + return _running; } public Region getRegion() @@ -87,9 +84,10 @@ public class QueueService _commandMap.put(command.getCommand().toLowerCase(), command); } - public void shutdown() + public synchronized void shutdown() { System.out.println("[Queue Service] Shutting down..."); - _running.set(false); + _queueManager.stop(); + _running = false; } } \ No newline at end of file diff --git a/Plugins/Mineplex.ClansQueue/src/com/mineplex/clansqueue/service/queue/ClansQueueManager.java b/Plugins/Mineplex.ClansQueue/src/com/mineplex/clansqueue/service/queue/ClansQueueManager.java index bf08084a3..ce0d765d6 100644 --- a/Plugins/Mineplex.ClansQueue/src/com/mineplex/clansqueue/service/queue/ClansQueueManager.java +++ b/Plugins/Mineplex.ClansQueue/src/com/mineplex/clansqueue/service/queue/ClansQueueManager.java @@ -2,9 +2,13 @@ package com.mineplex.clansqueue.service.queue; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import com.mineplex.clansqueue.common.ClansQueueMessenger; import com.mineplex.clansqueue.common.QueueConstant; @@ -20,25 +24,17 @@ public class ClansQueueManager { private final Map _servers = new HashMap<>(); private final Map _queues = new HashMap<>(); - private final Thread _updater; + private final ScheduledFuture _updater; public ClansQueueManager(QueueService service) { - _updater = new Thread(() -> + _updater = Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> { - while (service.isRunning()) + if (service.isRunning()) { - try - { - updateQueues(); - Thread.sleep(5000); - } - catch (InterruptedException e) - { - e.printStackTrace(); - } + updateQueues(); } - }, "Queue Update Thread"); + }, 0, 5, TimeUnit.SECONDS); } private QueueStatusMessage buildStatusMessage(Collection queues) @@ -90,7 +86,7 @@ public class ClansQueueManager public synchronized Collection getLoadedServers() { - return _servers.values(); + return Collections.unmodifiableCollection(_servers.values()); } public synchronized void deleteServer(ClansServer server) @@ -195,8 +191,8 @@ public class ClansQueueManager } } - public void start() + public void stop() { - _updater.start(); + _updater.cancel(true); } } \ No newline at end of file diff --git a/Plugins/Mineplex.ClansQueue/src/com/mineplex/clansqueue/service/queue/ClansServer.java b/Plugins/Mineplex.ClansQueue/src/com/mineplex/clansqueue/service/queue/ClansServer.java index 375a7ebc6..945cdd29e 100644 --- a/Plugins/Mineplex.ClansQueue/src/com/mineplex/clansqueue/service/queue/ClansServer.java +++ b/Plugins/Mineplex.ClansQueue/src/com/mineplex/clansqueue/service/queue/ClansServer.java @@ -1,13 +1,18 @@ package com.mineplex.clansqueue.service.queue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; +@ThreadSafe public class ClansServer { private final String _serverName; - private final AtomicBoolean _online = new AtomicBoolean(); - private final AtomicInteger _openSlots = new AtomicInteger(); + + @GuardedBy("this") + private boolean _online = false; + + @GuardedBy("this") + private int _openSlots = 0; public ClansServer(String serverName) { @@ -19,24 +24,24 @@ public class ClansServer return _serverName; } - public boolean isOnline() + public synchronized boolean isOnline() { - return _online.get(); + return _online; } - public void setOnline(boolean online) + public synchronized void setOnline(boolean online) { - _online.set(online); + _online = online; } - public int getOpenSlots() + public synchronized int getOpenSlots() { - return _openSlots.get(); + return _openSlots; } - public void setOpenSlots(int openSlots) + public synchronized void setOpenSlots(int openSlots) { - _openSlots.set(openSlots); + _openSlots = openSlots; } @Override diff --git a/Plugins/Mineplex.ClansQueue/src/com/mineplex/clansqueue/service/queue/QueuePlayer.java b/Plugins/Mineplex.ClansQueue/src/com/mineplex/clansqueue/service/queue/QueuePlayer.java index 2c5048d8b..b2919280b 100644 --- a/Plugins/Mineplex.ClansQueue/src/com/mineplex/clansqueue/service/queue/QueuePlayer.java +++ b/Plugins/Mineplex.ClansQueue/src/com/mineplex/clansqueue/service/queue/QueuePlayer.java @@ -2,16 +2,43 @@ package com.mineplex.clansqueue.service.queue; import java.util.UUID; -public class QueuePlayer +import javax.annotation.concurrent.Immutable; +import javax.annotation.concurrent.NotThreadSafe; + +import com.google.common.base.Preconditions; +import com.mineplex.clansqueue.common.QueueConstant; + +@NotThreadSafe +public class QueuePlayer implements Comparable { public final UUID PlayerUUID; public final String CurrentServer; + public final long EntryTime; + public int Weight; + public long LastWeightIncrease; public int Position; - public QueuePlayer(UUID uuid, String currentServer) + public QueuePlayer(UUID uuid, String currentServer, int weight) { PlayerUUID = uuid; CurrentServer = currentServer; + EntryTime = System.currentTimeMillis(); + Weight = weight; + LastWeightIncrease = System.currentTimeMillis(); + } + + private void updateWeight() + { + if (Weight < QueueConstant.MAXIMUM_WEIGHT_FROM_INCREASE && (LastWeightIncrease + QueueConstant.TIME_TO_INCREASE_WEIGHT) < System.currentTimeMillis()) + { + Weight++; + LastWeightIncrease = System.currentTimeMillis(); + } + } + + public ImmutableQueuePlayer immutable() + { + return new ImmutableQueuePlayer(PlayerUUID, CurrentServer, Position); } @Override @@ -30,4 +57,41 @@ public class QueuePlayer return ((QueuePlayer)o).PlayerUUID.equals(PlayerUUID); } + + @Override + public int compareTo(QueuePlayer player) + { + Preconditions.checkNotNull(player); + + updateWeight(); + player.updateWeight(); + + if (Weight == player.Weight) + { + return Long.compare(EntryTime, player.EntryTime); + } + else if (Weight > player.Weight) + { + return -1; + } + else + { + return 1; + } + } + + @Immutable + public static class ImmutableQueuePlayer + { + public final UUID PlayerUUID; + public final String CurrentServer; + public final int Position; + + private ImmutableQueuePlayer(UUID uuid, String server, int position) + { + PlayerUUID = uuid; + CurrentServer = server; + Position = position; + } + } } \ No newline at end of file diff --git a/Plugins/Mineplex.ClansQueue/src/com/mineplex/clansqueue/service/queue/ServerQueue.java b/Plugins/Mineplex.ClansQueue/src/com/mineplex/clansqueue/service/queue/ServerQueue.java index e9e269efc..9446a80d6 100644 --- a/Plugins/Mineplex.ClansQueue/src/com/mineplex/clansqueue/service/queue/ServerQueue.java +++ b/Plugins/Mineplex.ClansQueue/src/com/mineplex/clansqueue/service/queue/ServerQueue.java @@ -1,33 +1,57 @@ package com.mineplex.clansqueue.service.queue; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.Map; -import java.util.Optional; -import java.util.Queue; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import com.mineplex.clansqueue.common.SortableLinkedList; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; +import com.mineplex.clansqueue.common.EnclosedInteger; +import com.mineplex.clansqueue.common.SortableLinkedList; +import com.mineplex.clansqueue.service.queue.QueuePlayer.ImmutableQueuePlayer; + +@ThreadSafe public class ServerQueue { private final ClansServer _server; + + @GuardedBy("_sendLock") private final Map _sending = new LinkedHashMap<>(); + + @GuardedBy("_bypassLock") private final Map _bypassing = new LinkedHashMap<>(); - private final SortableLinkedList _queues = new SortableLinkedList<>(); + + @GuardedBy("_queueLock") + private final SortableLinkedList _queued = new SortableLinkedList<>(); + private final Object _bypassLock = new Object(); private final Object _queueLock = new Object(); private final Object _sendLock = new Object(); - private final AtomicBoolean _paused = new AtomicBoolean(); + private final Object _pauseLock = new Object(); + + @GuardedBy("_pauseLock") + private boolean _paused = false; public ServerQueue(ClansServer server) { _server = server; } + private void sortQueue() + { + synchronized (_queueLock) + { + _queued.sort(); + EnclosedInteger position = new EnclosedInteger(1); + _queued.forEach(qp -> + { + qp.Position = position.getAndIncrement(); + }); + } + } + public ClansServer getServer() { return _server; @@ -35,7 +59,15 @@ public class ServerQueue public boolean isPaused() { - return _paused.get() || !_server.isOnline(); + if (!_server.isOnline()) + { + return true; + } + + synchronized (_pauseLock) + { + return _paused; + } } public Map getNextSend() @@ -49,12 +81,13 @@ public class ServerQueue } } - public Map getPlayers() + public Map getPlayers() { synchronized (_queueLock) { - Map players = new LinkedHashMap<>(); - _queues.forEach(queue -> queue._players.forEach(qp -> players.put(qp.PlayerUUID, qp))); + Map players = new LinkedHashMap<>(); + sortQueue(); + _queued.forEach(qp -> players.put(qp.PlayerUUID, qp.immutable())); return players; } @@ -72,30 +105,10 @@ public class ServerQueue { synchronized (_queueLock) { - Optional queueOpt = _queues.stream().filter(q -> q._weight == weight).findFirst(); - PlayerQueue queue = queueOpt.orElseGet(() -> - { - PlayerQueue creating = new PlayerQueue(weight); - if (_queues.add(creating)) - { - _queues.sort(); - } - - return creating; - }); + QueuePlayer player = new QueuePlayer(uuid, currentServer, weight); + _queued.add(player); - QueuePlayer player = new QueuePlayer(uuid, currentServer); - queue._players.add(player); - - AtomicInteger position = new AtomicInteger(1); - if (_queues.removeIf(q -> q._players.isEmpty())) - { - _queues.sort(); - } - _queues.forEach(q -> q._players.forEach(qp -> - { - qp.Position = position.getAndIncrement(); - })); + sortQueue(); if (callback != null) { @@ -108,17 +121,9 @@ public class ServerQueue { synchronized (_queueLock) { - _queues.forEach(queue -> queue._players.removeIf(player -> player.PlayerUUID.equals(uuid))); + _queued.removeIf(player -> player.PlayerUUID.equals(uuid)); - AtomicInteger position = new AtomicInteger(1); - if (_queues.removeIf(q -> q._players.isEmpty())) - { - _queues.sort(); - } - _queues.forEach(q -> q._players.forEach(qp -> - { - qp.Position = position.getAndIncrement(); - })); + sortQueue(); if (after != null) { @@ -129,7 +134,10 @@ public class ServerQueue public void setPaused(boolean paused) { - _paused.set(paused); + synchronized (_pauseLock) + { + _paused = paused; + } } public void updatePositions(int openPlayerSlots) @@ -147,30 +155,18 @@ public class ServerQueue { if (!isPaused() && openPlayerSlots > 0) { + sortQueue(); while (send.size() < openPlayerSlots) { - if (_queues.removeIf(queue -> queue._players.isEmpty())) - { - _queues.sort(); - } - PlayerQueue queue = _queues.peek(); - if (queue == null) + QueuePlayer player = _queued.poll(); + if (player == null) { break; } - QueuePlayer player = queue._players.poll(); send.put(player.PlayerUUID, player.CurrentServer); } + sortQueue(); } - AtomicInteger position = new AtomicInteger(1); - if (_queues.removeIf(queue -> queue._players.isEmpty())) - { - _queues.sort(); - } - _queues.forEach(queue -> queue._players.forEach(qp -> - { - qp.Position = position.getAndIncrement(); - })); } if (send.isEmpty()) { @@ -181,42 +177,4 @@ public class ServerQueue _sending.putAll(send); } } - - private static class PlayerQueue implements Comparable - { - private final int _weight; - private final Queue _players = new LinkedList<>(); - - private PlayerQueue(int weight) - { - _weight = weight; - } - - @Override - public int compareTo(PlayerQueue queue) - { - if (queue == null) - { - throw new NullPointerException(); - } - return Integer.compare(queue._weight, _weight); - } - - @Override - public int hashCode() - { - return Integer.hashCode(_weight); - } - - @Override - public boolean equals(Object o) - { - if (o == null || !getClass().isInstance(o)) - { - return false; - } - - return ((PlayerQueue)o)._weight == _weight; - } - } } \ No newline at end of file diff --git a/Plugins/Mineplex.Hub.Clans/src/mineplex/clanshub/queue/HubQueueManager.java b/Plugins/Mineplex.Hub.Clans/src/mineplex/clanshub/queue/HubQueueManager.java index f4369b8cb..2eafbdb7b 100644 --- a/Plugins/Mineplex.Hub.Clans/src/mineplex/clanshub/queue/HubQueueManager.java +++ b/Plugins/Mineplex.Hub.Clans/src/mineplex/clanshub/queue/HubQueueManager.java @@ -63,7 +63,7 @@ public class HubQueueManager extends MiniClientPlugin public enum QueuePriority implements Permission { - BYPASS(-1, PermissionGroup.CONTENT, PermissionGroup.TRAINEE), + BYPASS(QueueConstant.BYPASS_QUEUE_WEIGHT, PermissionGroup.CONTENT, PermissionGroup.TRAINEE), PRIORITY(7, PermissionGroup.BUILDER), ETERNAL(6, PermissionGroup.ETERNAL), TITAN(5, PermissionGroup.TITAN),