Improve concurrency techniques used by the Clans Queue Manager and make player position in the queue based both on their rank and their time spent within it

This commit is contained in:
AlexTheCoder 2018-01-29 06:50:28 -05:00 committed by Alexander Meech
parent 2a73e49e8c
commit 3ebc898ec1
10 changed files with 206 additions and 144 deletions

View File

@ -7,7 +7,6 @@ public abstract class ClansQueueMessageBody
@Override
public final String toString()
{
super.toString();
return Utility.serialize(this);
}
}

View File

@ -0,0 +1,39 @@
package com.mineplex.clansqueue.common;
import javax.annotation.concurrent.NotThreadSafe;
@NotThreadSafe
public class EnclosedInteger
{
private int _value;
public EnclosedInteger(int value)
{
_value = value;
}
public EnclosedInteger()
{
this(0);
}
public int get()
{
return _value;
}
public int getAndIncrement()
{
return _value++;
}
public int incrementAndGet()
{
return ++_value;
}
public void set(int newValue)
{
_value = newValue;
}
}

View File

@ -1,8 +1,12 @@
package com.mineplex.clansqueue.common;
import java.util.concurrent.TimeUnit;
public class QueueConstant
{
public static final String SERVICE_MESSENGER_IDENTIFIER = "Queue System";
public static final int BYPASS_QUEUE_WEIGHT = -1;
public static final int MAX_TRANSFERS_PER_UPDATE = 5;
public static final int MAXIMUM_WEIGHT_FROM_INCREASE = 6;
public static final long TIME_TO_INCREASE_WEIGHT = TimeUnit.MINUTES.toMillis(10);
}

View File

@ -3,10 +3,9 @@ package com.mineplex.clansqueue.common;
import java.util.Comparator;
import java.util.LinkedList;
@SuppressWarnings("serial")
public class SortableLinkedList<T extends Comparable<T>> extends LinkedList<T>
{
private static final long serialVersionUID = -1751886037436467545L;
public void sort()
{
sort(Comparator.naturalOrder());

View File

@ -4,7 +4,6 @@ import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import com.mineplex.clansqueue.common.ClansQueueMessenger;
import com.mineplex.clansqueue.common.QueueConstant;
@ -31,7 +30,7 @@ public class QueueService
}
private final Region _region;
private final AtomicBoolean _running;
private boolean _running = false;
private final Map<String, ConsoleCommand> _commandMap = Collections.synchronizedMap(new HashMap<>());
private final CommandSystem _commandSystem;
private final ClansQueueManager _queueManager;
@ -46,17 +45,15 @@ public class QueueService
{
_region = Region.US;
}
_running = new AtomicBoolean();
_commandSystem = new CommandSystem(this, _commandMap);
_queueManager = new ClansQueueManager(this);
}
private void start()
private synchronized void start()
{
System.out.println("[Queue Service] Enabling on region " + getRegion().name());
_running.set(true);
_running = true;
_commandSystem.start();
_queueManager.start();
ClansQueueMessenger messenger = ClansQueueMessenger.getMessenger(QueueConstant.SERVICE_MESSENGER_IDENTIFIER);
messenger.registerListener(ServerOnlineMessage.class, (online, origin) -> _queueManager.handleServerEnable(online.ServerName));
@ -72,9 +69,9 @@ public class QueueService
return _queueManager;
}
public boolean isRunning()
public synchronized boolean isRunning()
{
return _running.get();
return _running;
}
public Region getRegion()
@ -87,9 +84,10 @@ public class QueueService
_commandMap.put(command.getCommand().toLowerCase(), command);
}
public void shutdown()
public synchronized void shutdown()
{
System.out.println("[Queue Service] Shutting down...");
_running.set(false);
_queueManager.stop();
_running = false;
}
}

View File

@ -2,9 +2,13 @@ package com.mineplex.clansqueue.service.queue;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import com.mineplex.clansqueue.common.ClansQueueMessenger;
import com.mineplex.clansqueue.common.QueueConstant;
@ -20,25 +24,17 @@ public class ClansQueueManager
{
private final Map<String, ClansServer> _servers = new HashMap<>();
private final Map<ClansServer, ServerQueue> _queues = new HashMap<>();
private final Thread _updater;
private final ScheduledFuture<?> _updater;
public ClansQueueManager(QueueService service)
{
_updater = new Thread(() ->
_updater = Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() ->
{
while (service.isRunning())
if (service.isRunning())
{
try
{
updateQueues();
Thread.sleep(5000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
updateQueues();
}
}, "Queue Update Thread");
}, 0, 5, TimeUnit.SECONDS);
}
private QueueStatusMessage buildStatusMessage(Collection<ServerQueue> queues)
@ -90,7 +86,7 @@ public class ClansQueueManager
public synchronized Collection<ClansServer> getLoadedServers()
{
return _servers.values();
return Collections.unmodifiableCollection(_servers.values());
}
public synchronized void deleteServer(ClansServer server)
@ -195,8 +191,8 @@ public class ClansQueueManager
}
}
public void start()
public void stop()
{
_updater.start();
_updater.cancel(true);
}
}

View File

@ -1,13 +1,18 @@
package com.mineplex.clansqueue.service.queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
@ThreadSafe
public class ClansServer
{
private final String _serverName;
private final AtomicBoolean _online = new AtomicBoolean();
private final AtomicInteger _openSlots = new AtomicInteger();
@GuardedBy("this")
private boolean _online = false;
@GuardedBy("this")
private int _openSlots = 0;
public ClansServer(String serverName)
{
@ -19,24 +24,24 @@ public class ClansServer
return _serverName;
}
public boolean isOnline()
public synchronized boolean isOnline()
{
return _online.get();
return _online;
}
public void setOnline(boolean online)
public synchronized void setOnline(boolean online)
{
_online.set(online);
_online = online;
}
public int getOpenSlots()
public synchronized int getOpenSlots()
{
return _openSlots.get();
return _openSlots;
}
public void setOpenSlots(int openSlots)
public synchronized void setOpenSlots(int openSlots)
{
_openSlots.set(openSlots);
_openSlots = openSlots;
}
@Override

View File

@ -2,16 +2,43 @@ package com.mineplex.clansqueue.service.queue;
import java.util.UUID;
public class QueuePlayer
import javax.annotation.concurrent.Immutable;
import javax.annotation.concurrent.NotThreadSafe;
import com.google.common.base.Preconditions;
import com.mineplex.clansqueue.common.QueueConstant;
@NotThreadSafe
public class QueuePlayer implements Comparable<QueuePlayer>
{
public final UUID PlayerUUID;
public final String CurrentServer;
public final long EntryTime;
public int Weight;
public long LastWeightIncrease;
public int Position;
public QueuePlayer(UUID uuid, String currentServer)
public QueuePlayer(UUID uuid, String currentServer, int weight)
{
PlayerUUID = uuid;
CurrentServer = currentServer;
EntryTime = System.currentTimeMillis();
Weight = weight;
LastWeightIncrease = System.currentTimeMillis();
}
private void updateWeight()
{
if (Weight < QueueConstant.MAXIMUM_WEIGHT_FROM_INCREASE && (LastWeightIncrease + QueueConstant.TIME_TO_INCREASE_WEIGHT) < System.currentTimeMillis())
{
Weight++;
LastWeightIncrease = System.currentTimeMillis();
}
}
public ImmutableQueuePlayer immutable()
{
return new ImmutableQueuePlayer(PlayerUUID, CurrentServer, Position);
}
@Override
@ -30,4 +57,41 @@ public class QueuePlayer
return ((QueuePlayer)o).PlayerUUID.equals(PlayerUUID);
}
@Override
public int compareTo(QueuePlayer player)
{
Preconditions.checkNotNull(player);
updateWeight();
player.updateWeight();
if (Weight == player.Weight)
{
return Long.compare(EntryTime, player.EntryTime);
}
else if (Weight > player.Weight)
{
return -1;
}
else
{
return 1;
}
}
@Immutable
public static class ImmutableQueuePlayer
{
public final UUID PlayerUUID;
public final String CurrentServer;
public final int Position;
private ImmutableQueuePlayer(UUID uuid, String server, int position)
{
PlayerUUID = uuid;
CurrentServer = server;
Position = position;
}
}
}

View File

@ -1,33 +1,57 @@
package com.mineplex.clansqueue.service.queue;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import com.mineplex.clansqueue.common.SortableLinkedList;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import com.mineplex.clansqueue.common.EnclosedInteger;
import com.mineplex.clansqueue.common.SortableLinkedList;
import com.mineplex.clansqueue.service.queue.QueuePlayer.ImmutableQueuePlayer;
@ThreadSafe
public class ServerQueue
{
private final ClansServer _server;
@GuardedBy("_sendLock")
private final Map<UUID, String> _sending = new LinkedHashMap<>();
@GuardedBy("_bypassLock")
private final Map<UUID, String> _bypassing = new LinkedHashMap<>();
private final SortableLinkedList<PlayerQueue> _queues = new SortableLinkedList<>();
@GuardedBy("_queueLock")
private final SortableLinkedList<QueuePlayer> _queued = new SortableLinkedList<>();
private final Object _bypassLock = new Object();
private final Object _queueLock = new Object();
private final Object _sendLock = new Object();
private final AtomicBoolean _paused = new AtomicBoolean();
private final Object _pauseLock = new Object();
@GuardedBy("_pauseLock")
private boolean _paused = false;
public ServerQueue(ClansServer server)
{
_server = server;
}
private void sortQueue()
{
synchronized (_queueLock)
{
_queued.sort();
EnclosedInteger position = new EnclosedInteger(1);
_queued.forEach(qp ->
{
qp.Position = position.getAndIncrement();
});
}
}
public ClansServer getServer()
{
return _server;
@ -35,7 +59,15 @@ public class ServerQueue
public boolean isPaused()
{
return _paused.get() || !_server.isOnline();
if (!_server.isOnline())
{
return true;
}
synchronized (_pauseLock)
{
return _paused;
}
}
public Map<UUID, String> getNextSend()
@ -49,12 +81,13 @@ public class ServerQueue
}
}
public Map<UUID, QueuePlayer> getPlayers()
public Map<UUID, ImmutableQueuePlayer> getPlayers()
{
synchronized (_queueLock)
{
Map<UUID, QueuePlayer> players = new LinkedHashMap<>();
_queues.forEach(queue -> queue._players.forEach(qp -> players.put(qp.PlayerUUID, qp)));
Map<UUID, ImmutableQueuePlayer> players = new LinkedHashMap<>();
sortQueue();
_queued.forEach(qp -> players.put(qp.PlayerUUID, qp.immutable()));
return players;
}
@ -72,30 +105,10 @@ public class ServerQueue
{
synchronized (_queueLock)
{
Optional<PlayerQueue> queueOpt = _queues.stream().filter(q -> q._weight == weight).findFirst();
PlayerQueue queue = queueOpt.orElseGet(() ->
{
PlayerQueue creating = new PlayerQueue(weight);
if (_queues.add(creating))
{
_queues.sort();
}
return creating;
});
QueuePlayer player = new QueuePlayer(uuid, currentServer, weight);
_queued.add(player);
QueuePlayer player = new QueuePlayer(uuid, currentServer);
queue._players.add(player);
AtomicInteger position = new AtomicInteger(1);
if (_queues.removeIf(q -> q._players.isEmpty()))
{
_queues.sort();
}
_queues.forEach(q -> q._players.forEach(qp ->
{
qp.Position = position.getAndIncrement();
}));
sortQueue();
if (callback != null)
{
@ -108,17 +121,9 @@ public class ServerQueue
{
synchronized (_queueLock)
{
_queues.forEach(queue -> queue._players.removeIf(player -> player.PlayerUUID.equals(uuid)));
_queued.removeIf(player -> player.PlayerUUID.equals(uuid));
AtomicInteger position = new AtomicInteger(1);
if (_queues.removeIf(q -> q._players.isEmpty()))
{
_queues.sort();
}
_queues.forEach(q -> q._players.forEach(qp ->
{
qp.Position = position.getAndIncrement();
}));
sortQueue();
if (after != null)
{
@ -129,7 +134,10 @@ public class ServerQueue
public void setPaused(boolean paused)
{
_paused.set(paused);
synchronized (_pauseLock)
{
_paused = paused;
}
}
public void updatePositions(int openPlayerSlots)
@ -147,30 +155,18 @@ public class ServerQueue
{
if (!isPaused() && openPlayerSlots > 0)
{
sortQueue();
while (send.size() < openPlayerSlots)
{
if (_queues.removeIf(queue -> queue._players.isEmpty()))
{
_queues.sort();
}
PlayerQueue queue = _queues.peek();
if (queue == null)
QueuePlayer player = _queued.poll();
if (player == null)
{
break;
}
QueuePlayer player = queue._players.poll();
send.put(player.PlayerUUID, player.CurrentServer);
}
sortQueue();
}
AtomicInteger position = new AtomicInteger(1);
if (_queues.removeIf(queue -> queue._players.isEmpty()))
{
_queues.sort();
}
_queues.forEach(queue -> queue._players.forEach(qp ->
{
qp.Position = position.getAndIncrement();
}));
}
if (send.isEmpty())
{
@ -181,42 +177,4 @@ public class ServerQueue
_sending.putAll(send);
}
}
private static class PlayerQueue implements Comparable<PlayerQueue>
{
private final int _weight;
private final Queue<QueuePlayer> _players = new LinkedList<>();
private PlayerQueue(int weight)
{
_weight = weight;
}
@Override
public int compareTo(PlayerQueue queue)
{
if (queue == null)
{
throw new NullPointerException();
}
return Integer.compare(queue._weight, _weight);
}
@Override
public int hashCode()
{
return Integer.hashCode(_weight);
}
@Override
public boolean equals(Object o)
{
if (o == null || !getClass().isInstance(o))
{
return false;
}
return ((PlayerQueue)o)._weight == _weight;
}
}
}

View File

@ -63,7 +63,7 @@ public class HubQueueManager extends MiniClientPlugin<QueuePlayerData>
public enum QueuePriority implements Permission
{
BYPASS(-1, PermissionGroup.CONTENT, PermissionGroup.TRAINEE),
BYPASS(QueueConstant.BYPASS_QUEUE_WEIGHT, PermissionGroup.CONTENT, PermissionGroup.TRAINEE),
PRIORITY(7, PermissionGroup.BUILDER),
ETERNAL(6, PermissionGroup.ETERNAL),
TITAN(5, PermissionGroup.TITAN),