diff --git a/Plugins/Mineplex.ServerData/pom.xml b/Plugins/Mineplex.ServerData/pom.xml
index 84a00934c..85d99c4e7 100644
--- a/Plugins/Mineplex.ServerData/pom.xml
+++ b/Plugins/Mineplex.ServerData/pom.xml
@@ -8,6 +8,10 @@
mineplex-parent
dev-SNAPSHOT
+
+
+ 18.0
+
mineplex-serverdata
@@ -33,5 +37,11 @@
jooq
3.5.2
+
+ com.google.guava
+ guava
+ ${version.guava}
+ compile
+
diff --git a/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubJedisClient.java b/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubJedisClient.java
new file mode 100644
index 000000000..5a8cfa8a5
--- /dev/null
+++ b/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubJedisClient.java
@@ -0,0 +1,356 @@
+package mineplex.serverdata.redis.messaging;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import mineplex.serverdata.Utility;
+import mineplex.serverdata.servers.ConnectionData;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPubSub;
+import redis.clients.jedis.exceptions.JedisConnectionException;
+import redis.clients.jedis.exceptions.JedisDataException;
+
+/**
+ * A subscription to a Redis pub/sub system through a Jedis client. Includes a publishing mechanism
+ * as well.
+ *
+ * Subscribes to Jedis and offers a variety of methods to edit the channels that this listens to
+ * over time. Does not support pattern-matching, even though Jedis can. Takes a single subscriber to
+ * inform of incoming messages (on all channels this is subscribed to).
+ *
+ * For the sake of internal efficiency, this does not protect the sanity or unchangeability of
+ * arguments passed into its methods. Clients should not generally interact with this directly.
+ *
+ * The Jedis pub/sub interface is a little confusing, especially when it comes to multithreading. At
+ * any given time, if this class is subscribed to any channels at all, it will have a single thread
+ * that is listening for incoming messages from redis with a blocking method. After that listening
+ * thread is created, we can add and remove subscriptions as desired, but the initial subscription
+ * and actual listening must be done on its own thread. When all channels are unsubscribed from, the
+ * listening thread returns. Note that this is stated with about 70% certainty, as the internals of
+ * the pub/sub mechanism are not entirely clear to me.
+ *
+ * This class maintains a constant connection to its redis server by subscribing to a base channel.
+ * This makes it much easier to protect its operation from potentially insane client commands.
+ *
+ * If the connection to the given redis instance fails or is interrupted, will keep attempting to
+ * reconnect periodically until destroyed. Publishers and subscribers are not informed of failure in
+ * any way.
+ *
+ * When {@link #unsubscribe()} or {@link #destroy()} is called, this class ceases operation.
+ */
+public class PubSubJedisClient extends JedisPubSub implements PubSubLibraryClient
+{
+
+ private static final long RECONNECT_PERIOD_MILLIS = 800;
+ private static final String BASE_CHANNEL = "pG8n5jp#";
+
+ private static final String BOLD = "\u001B[1m";
+ private static final String RESET = "\u001B[0m";
+
+ private final String _id;
+ private JedisPool _writePool;
+ private final ConnectionData _readConn;
+ private JedisPool _readPool;
+ private final ExecutorService _threadPool = Executors.newCachedThreadPool();
+ private volatile Subscriber _sub;
+
+ private final Set _channels = Collections
+ .newSetFromMap(new ConcurrentHashMap());
+ private final Map> _pendingFutures = new ConcurrentHashMap>();
+
+ private volatile boolean _subscribed; // Is there a base subscription yet?
+ private volatile boolean _destroyed; // has this been deliberately destroyed?
+
+ /**
+ * Class constructor.
+ *
+ * @param writeTo The connection info for the redis instance this client should publish to.
+ * @param readFrom The connection info for the redis instance this client to subscribe to.
+ */
+ public PubSubJedisClient(ConnectionData writeTo, ConnectionData readFrom)
+ {
+ if (writeTo == null)
+ {
+ throw new IllegalArgumentException("redis connection info cannot be null");
+ }
+
+ _id = writeTo.getName();
+ _writePool = Utility.generatePool(writeTo);
+
+ _readConn = readFrom;
+ _readPool = Utility.generatePool(readFrom);
+
+ createSubscription(BASE_CHANNEL);
+ }
+
+ @Override
+ public final synchronized void setSubscriber(Subscriber sub)
+ {
+ _sub = sub;
+ }
+
+ @Override
+ public final ListenableFuture addChannel(String channel)
+ {
+ SettableFuture ret = _pendingFutures.get(channel);
+ if (ret == null)
+ {
+ ret = SettableFuture.create();
+ _pendingFutures.put(channel, ret);
+ }
+
+ try
+ {
+ _channels.add(channel);
+
+ if (_subscribed)
+ { // Already has a subscription thread and can just add a new channel to it.
+ subscribe(channel);
+ }
+ } catch (Exception ex)
+ {
+ log("Encountered issue subscribing to a channel.");
+ ex.printStackTrace();
+ ret.setException(ex);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public final void removeChannel(String channel)
+ {
+ if (BASE_CHANNEL.equals(channel))
+ { // Protects the base subscription
+ return;
+ }
+
+ _channels.remove(channel);
+
+ if (_subscribed)
+ {
+ unsubscribe(channel);
+ }
+ }
+
+ @Override
+ public final void unsubscribe()
+ {
+ destroy();
+ }
+
+ @Override
+ public final void destroy()
+ {
+ _destroyed = true;
+ try
+ {
+ super.unsubscribe();
+ } catch (NullPointerException e)
+ {
+ }
+
+ for (SettableFuture fut : _pendingFutures.values())
+ {
+ fut.set(false);
+ }
+ }
+
+ @Override
+ public final void onMessage(String channel, String message)
+ {
+ if (_sub == null || BASE_CHANNEL.equals(channel))
+ {
+ return;
+ }
+
+ try
+ {
+ _sub.onMessage(channel, message);
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public final ListenableFuture publish(final String channel, final String message)
+ {
+ final SettableFuture ret = SettableFuture.create();
+ _threadPool.execute(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ Jedis bJedis = null;
+ try
+ {
+ bJedis = _writePool.getResource();
+ bJedis.publish(channel, message);
+ _writePool.returnResource((Jedis) bJedis);
+ ret.set(true);
+
+ } catch (Exception e)
+ {
+ log("Encountered issue while publishing a message.");
+ e.printStackTrace();
+ if (bJedis != null)
+ {
+ _writePool.returnBrokenResource((Jedis) bJedis);
+ }
+ ret.set(false);
+ }
+ }
+ });
+ return ret;
+ }
+
+ // Confirms successful subscriptions/unsubscriptions.
+ @Override
+ public void onSubscribe(String channel, int subscribedChannels)
+ {
+
+ // informs subscriber that this subscription completed successfully
+ SettableFuture fut = _pendingFutures.remove(channel);
+ if (fut != null)
+ {
+ fut.set(true);
+ }
+
+ if (!_subscribed)
+ {
+ for (String subscribeTo : _channels)
+ {
+ subscribe(subscribeTo);
+ }
+ }
+ _subscribed = true;
+
+ log("Subscribed to channel: " + channel);
+ }
+
+ @Override
+ public void onUnsubscribe(String channel, int subscribedChannels)
+ {
+ log("Unsubscribed from channel: " + channel);
+ }
+
+ /**
+ * Creates the initial listening thread which blocks as it polls redis for new messages.
+ * Subsequent subscriptions can simply be added using {@link #subscribe(String...)} after the
+ * subscription thread has been created.
+ *
+ * @param firstChannel The first channel to initially subscribe to. If you do not have a first
+ * channel, there is no reason to create a subscriber thread yet.
+ */
+ private void createSubscription(final String firstChannel)
+ {
+
+ final JedisPubSub pubsub = this;
+
+ new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+
+ boolean first = true;
+
+ while (!_destroyed)
+ {
+
+ if (!first)
+ {
+ log("Jedis connection to " + _readConn.getHost() + ":"
+ + _readConn.getPort()
+ + " failed or was interrupted, attempting to reconnect");
+ }
+ first = false;
+
+ Jedis jedisInstance = null;
+
+ try
+ {
+ // gets a non-thread-safe jedis instance from the thread-safe pool.
+ jedisInstance = _readPool.getResource();
+
+ log("Creating initial jedis subscription to channel " + firstChannel);
+ // this will block as long as there are subscriptions
+ jedisInstance.subscribe(pubsub, firstChannel);
+
+ log("jedisInstance.subscribe() returned, subscription over.");
+
+ // when the subscription ends (subscribe() returns), returns the instance to
+ // the pool
+ _readPool.returnResource(jedisInstance);
+
+ } catch (JedisConnectionException e)
+ {
+ log("Jedis connection encountered an issue.");
+ e.printStackTrace();
+ if (jedisInstance != null)
+ {
+ _readPool.returnBrokenResource((Jedis) jedisInstance);
+ }
+
+ } catch (JedisDataException e)
+ {
+ log("Jedis connection encountered an issue.");
+ e.printStackTrace();
+ if (jedisInstance != null)
+ {
+ _readPool.returnBrokenResource((Jedis) jedisInstance);
+ }
+ }
+ _subscribed = false;
+
+ // sleeps for a short pause, rather than constantly retrying connection
+ if (!_destroyed)
+ {
+ try
+ {
+ Thread.sleep(RECONNECT_PERIOD_MILLIS);
+ } catch (InterruptedException e)
+ {
+ _destroyed = true;
+ log("Reconnection pause thread was interrupted.");
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ }).start();
+ }
+
+ // This implementation does not support pattern-matching subscriptions
+ @Override
+ public void onPMessage(String pattern, String channel, String message)
+ {
+ }
+
+ @Override
+ public void onPSubscribe(String pattern, int subscribedChannels)
+ {
+ }
+
+ @Override
+ public void onPUnsubscribe(String pattern, int subscribedChannels)
+ {
+ }
+
+ private void log(String msg)
+ {
+ System.out.println(BOLD + "[" + getClass().getSimpleName()
+ + (_id != null && !_id.isEmpty() ? " " + _id : "") + "] " + RESET + msg);
+ }
+
+}
diff --git a/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubLibraryClient.java b/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubLibraryClient.java
new file mode 100644
index 000000000..4a9f2820c
--- /dev/null
+++ b/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubLibraryClient.java
@@ -0,0 +1,61 @@
+package mineplex.serverdata.redis.messaging;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * A multi-channel subscription and publisher to a pub/sub messaging implementation. An interface to
+ * the actual low-level pub/sub library, whatever it may be.
+ *
+ * For the sake of internal efficiency, this makes no guarantees for the sanity or unchangeability
+ * of arguments passed into its methods. Clients should not generally interact with this directly.
+ */
+public interface PubSubLibraryClient
+{
+
+ /**
+ * Publishes a message to all subscribers of a given channel.
+ *
+ * @param channel The channel to publish the message on.
+ * @param message The message to send.
+ * @return A future object that will complete after an unknown amount of time with
+ * false
if for some locally known reason the message definitely could not
+ * be published, else completes with true
.
+ */
+ ListenableFuture publish(String channel, String message);
+
+ /**
+ * Adds a channel to this subscription.
+ *
+ * @param channel The channel to add. Should not change after being passed in.
+ * @return The asynchronous, future result of this attempt to add the channel. Will have
+ * true
when the subscription starts successfully.
+ */
+ ListenableFuture addChannel(String channel);
+
+ /**
+ * Removes a channel from this subscription.
+ *
+ * @param channel The channel to remove. Should not change after being passed in.
+ */
+ void removeChannel(String channel);
+
+ /**
+ * Removes all channels from this subscription, kills its connections, and relinquishes any
+ * resources it was occupying.
+ *
+ * Depending on the implementation, once a subscription has been destroyed, it may not be
+ * reusable and it may be necessary to construct a new one in order to resume.
+ *
+ * Call this when the subscription is no longer being used. Holding unnecessary connections can
+ * cause serious performance and other issues on both ends.
+ */
+ void destroy();
+
+ /**
+ * Sets the subscriber to inform of messages received by this subscription.
+ *
+ * @param sub The listener for this subscription.
+ */
+ void setSubscriber(Subscriber sub);
+
+}
diff --git a/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubMessager.java b/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubMessager.java
new file mode 100644
index 000000000..ea45109e1
--- /dev/null
+++ b/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubMessager.java
@@ -0,0 +1,56 @@
+package mineplex.serverdata.redis.messaging;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Messager for standard pub/sub model. Handles multiple publishers and subscribers.
+ *
+ * All messaging is asynchronous and non-blocking, even to local subscribers.
+ *
+ * For more about the pub/sub messaging paradigm, see http://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern
+ */
+public interface PubSubMessager
+{
+
+ /**
+ * Publishes a message to all subscribers of a given channel.
+ *
+ * Publishes to all connected subscribers, including local ones.
+ *
+ * @param channel The channel to publish the message on.
+ * @param message The message to send.
+ * @return A future object that will complete after an unknown amount of time with
+ * false
if for some locally known reason the message definitely could not
+ * be published, else completes with true
(which does not guarantee it
+ * succeeded 100%).
+ */
+ ListenableFuture publish(String channel, String message);
+
+ /**
+ * Subscribes to a messaging channel.
+ *
+ * When incoming messages arrive, the subscriber is called from an arbitrary new thread.
+ *
+ * @param channel The channel to subscribe to.
+ * @param sub The subscriber to inform of incoming messages.
+ * @return The asynchronous, future result of this attempt to subscribe to the channel. Will
+ * have true
when the subscription starts successfully.
+ */
+ ListenableFuture subscribe(String channel, Subscriber sub);
+
+ /**
+ * Unsubscribes from a messaging channel.
+ *
+ * @param channel The channel to unsubscribe from.
+ * @param sub The subscriber to stop informing of incoming messages.
+ */
+ void unsubscribe(String channel, Subscriber sub);
+
+ /**
+ * Attempts to gracefully shut down this messager. Generally irreversible.
+ */
+ void shutdown();
+
+}
diff --git a/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubRouter.java b/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubRouter.java
new file mode 100644
index 000000000..453b90743
--- /dev/null
+++ b/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubRouter.java
@@ -0,0 +1,165 @@
+package mineplex.serverdata.redis.messaging;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+/**
+ * A pub/sub messager that simply routes messages to some underlying pub/sub implementation, which
+ * is in turn represented by a multi-channel subscription and a publishing mechanism.
+ *
+ * This class handles:
+ *
+ * - Providing a modular messaging interface that is thread-safe.
+ *
- Protecting pub/sub implementations from some bad client behavior/data.
+ *
- Routing messages for multiple subscribers to the same channel(s).
+ *
+ */
+public class PubSubRouter implements PubSubMessager, Subscriber
+{
+
+ private final PubSubLibraryClient _pubSubClient;
+ private final Map> _subscribers;
+
+ private final ExecutorService _threadPool;
+
+ public PubSubRouter(PubSubLibraryClient client)
+ {
+ if (client == null)
+ {
+ throw new IllegalArgumentException("pubsub client cannot be null");
+ }
+
+ this._pubSubClient = client;
+ this._pubSubClient.setSubscriber(this);
+ this._subscribers = new ConcurrentHashMap>();
+
+ this._threadPool = Executors.newCachedThreadPool();
+ }
+
+ @Override
+ public final ListenableFuture publish(String channel, String message)
+ {
+ if (channel == null || channel.isEmpty())
+ {
+ throw new IllegalArgumentException("channel cannot be null or empty");
+ }
+
+ // messages of 0 length are allowed. Null messages are treated as messages of 0 length.
+ if (message == null)
+ {
+ message = "";
+ }
+
+ return _pubSubClient.publish(channel, message);
+ }
+
+ @Override
+ public final ListenableFuture subscribe(String channel, Subscriber sub)
+ {
+ if (channel == null || channel.isEmpty() || sub == null)
+ {
+ throw new IllegalArgumentException("params cannot be null and channel cannot be empty");
+ }
+
+ ListenableFuture ret = null;
+
+ Set channelSubs = _subscribers.get(channel);
+ if (channelSubs == null)
+ {
+ // uses CopyOnWriteArraySet for fast and consistent iteration (forwarding messages to
+ // subscribers) but slow writes (adding/removing subscribers).
+ // See a discussion of the issue here:
+ // http://stackoverflow.com/questions/6720396/different-types-of-thread-safe-sets-in-java
+ channelSubs = new CopyOnWriteArraySet();
+ _subscribers.put(channel, channelSubs);
+
+ // starts a jedis subscription to the channel if there were no subscribers before
+ ret = _pubSubClient.addChannel(channel);
+
+ } else
+ {
+ ret = SettableFuture.create();
+ ((SettableFuture) ret).set(true); // already subscribed, calls back immediately
+ }
+
+ channelSubs.add(sub);
+
+ return ret;
+ }
+
+ @Override
+ public final void unsubscribe(String channel, Subscriber sub)
+ {
+ if (channel == null || channel.isEmpty() || sub == null)
+ {
+ throw new IllegalArgumentException("params cannot be null and channel cannot be empty");
+ }
+
+ Set channelSubs = _subscribers.get(channel);
+ if (channelSubs == null)
+ { // no subscribers for the channel to begin with.
+ return;
+ }
+
+ channelSubs.remove(sub);
+
+ // stops the subscription to this channel if the unsubscribed was the last subscriber
+ if (channelSubs.isEmpty())
+ {
+ _subscribers.remove(channel);
+ _pubSubClient.removeChannel(channel);
+ }
+ }
+
+ @Override
+ public final void onMessage(final String channel, final String message)
+ {
+ if (channel == null || message == null || channel.isEmpty())
+ {
+ return;
+ }
+
+ Set channelSubs = _subscribers.get(channel);
+
+ if (channelSubs == null)
+ { // We should not still be listening
+ _pubSubClient.removeChannel(channel);
+ return;
+ } else if (channelSubs.isEmpty())
+ {
+ _subscribers.remove(channel);
+ _pubSubClient.removeChannel(channel);
+ return;
+ }
+
+ for (final Subscriber sub : channelSubs)
+ {
+
+ // Gives subscribers their own thread from the thread pool in which to react to the
+ // message.
+ // Avoids interruptions and other problems while iterating over the subscriber set.
+ _threadPool.execute(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ sub.onMessage(channel, message);
+ }
+ });
+ }
+ }
+
+ @Override
+ public void shutdown()
+ {
+ _pubSubClient.destroy();
+ }
+
+}
diff --git a/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/Subscriber.java b/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/Subscriber.java
new file mode 100644
index 000000000..5ecc09796
--- /dev/null
+++ b/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/Subscriber.java
@@ -0,0 +1,19 @@
+package mineplex.serverdata.redis.messaging;
+
+/**
+ * A subscriber to a pub/sub channel.
+ */
+public interface Subscriber
+{
+
+ /**
+ * Called when a message is sent on a channel that this is subscribed to.
+ *
+ * No guarantees are made about what thread this will be called from.
+ *
+ * @param channel The channel that the message was sent on.
+ * @param message The message that was received.
+ */
+ void onMessage(String channel, String message);
+
+}