From ff13d07a7c5d5652e283cd4f54a3cfe8eebcb6e5 Mon Sep 17 00:00:00 2001 From: Kenny Date: Tue, 2 May 2017 20:29:36 -0400 Subject: [PATCH] Port redis pubsub messaging API from PE --- Plugins/Mineplex.ServerData/pom.xml | 10 + .../redis/messaging/PubSubJedisClient.java | 356 ++++++++++++++++++ .../redis/messaging/PubSubLibraryClient.java | 61 +++ .../redis/messaging/PubSubMessager.java | 56 +++ .../redis/messaging/PubSubRouter.java | 165 ++++++++ .../redis/messaging/Subscriber.java | 19 + 6 files changed, 667 insertions(+) create mode 100644 Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubJedisClient.java create mode 100644 Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubLibraryClient.java create mode 100644 Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubMessager.java create mode 100644 Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubRouter.java create mode 100644 Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/Subscriber.java diff --git a/Plugins/Mineplex.ServerData/pom.xml b/Plugins/Mineplex.ServerData/pom.xml index 84a00934c..85d99c4e7 100644 --- a/Plugins/Mineplex.ServerData/pom.xml +++ b/Plugins/Mineplex.ServerData/pom.xml @@ -8,6 +8,10 @@ mineplex-parent dev-SNAPSHOT + + + 18.0 + mineplex-serverdata @@ -33,5 +37,11 @@ jooq 3.5.2 + + com.google.guava + guava + ${version.guava} + compile + diff --git a/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubJedisClient.java b/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubJedisClient.java new file mode 100644 index 000000000..5a8cfa8a5 --- /dev/null +++ b/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubJedisClient.java @@ -0,0 +1,356 @@ +package mineplex.serverdata.redis.messaging; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + +import mineplex.serverdata.Utility; +import mineplex.serverdata.servers.ConnectionData; + +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisPool; +import redis.clients.jedis.JedisPubSub; +import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.exceptions.JedisDataException; + +/** + * A subscription to a Redis pub/sub system through a Jedis client. Includes a publishing mechanism + * as well. + *

+ * Subscribes to Jedis and offers a variety of methods to edit the channels that this listens to + * over time. Does not support pattern-matching, even though Jedis can. Takes a single subscriber to + * inform of incoming messages (on all channels this is subscribed to). + *

+ * For the sake of internal efficiency, this does not protect the sanity or unchangeability of + * arguments passed into its methods. Clients should not generally interact with this directly. + *

+ * The Jedis pub/sub interface is a little confusing, especially when it comes to multithreading. At + * any given time, if this class is subscribed to any channels at all, it will have a single thread + * that is listening for incoming messages from redis with a blocking method. After that listening + * thread is created, we can add and remove subscriptions as desired, but the initial subscription + * and actual listening must be done on its own thread. When all channels are unsubscribed from, the + * listening thread returns. Note that this is stated with about 70% certainty, as the internals of + * the pub/sub mechanism are not entirely clear to me. + *

+ * This class maintains a constant connection to its redis server by subscribing to a base channel. + * This makes it much easier to protect its operation from potentially insane client commands. + *

+ * If the connection to the given redis instance fails or is interrupted, will keep attempting to + * reconnect periodically until destroyed. Publishers and subscribers are not informed of failure in + * any way. + *

+ * When {@link #unsubscribe()} or {@link #destroy()} is called, this class ceases operation. + */ +public class PubSubJedisClient extends JedisPubSub implements PubSubLibraryClient +{ + + private static final long RECONNECT_PERIOD_MILLIS = 800; + private static final String BASE_CHANNEL = "pG8n5jp#"; + + private static final String BOLD = "\u001B[1m"; + private static final String RESET = "\u001B[0m"; + + private final String _id; + private JedisPool _writePool; + private final ConnectionData _readConn; + private JedisPool _readPool; + private final ExecutorService _threadPool = Executors.newCachedThreadPool(); + private volatile Subscriber _sub; + + private final Set _channels = Collections + .newSetFromMap(new ConcurrentHashMap()); + private final Map> _pendingFutures = new ConcurrentHashMap>(); + + private volatile boolean _subscribed; // Is there a base subscription yet? + private volatile boolean _destroyed; // has this been deliberately destroyed? + + /** + * Class constructor. + * + * @param writeTo The connection info for the redis instance this client should publish to. + * @param readFrom The connection info for the redis instance this client to subscribe to. + */ + public PubSubJedisClient(ConnectionData writeTo, ConnectionData readFrom) + { + if (writeTo == null) + { + throw new IllegalArgumentException("redis connection info cannot be null"); + } + + _id = writeTo.getName(); + _writePool = Utility.generatePool(writeTo); + + _readConn = readFrom; + _readPool = Utility.generatePool(readFrom); + + createSubscription(BASE_CHANNEL); + } + + @Override + public final synchronized void setSubscriber(Subscriber sub) + { + _sub = sub; + } + + @Override + public final ListenableFuture addChannel(String channel) + { + SettableFuture ret = _pendingFutures.get(channel); + if (ret == null) + { + ret = SettableFuture.create(); + _pendingFutures.put(channel, ret); + } + + try + { + _channels.add(channel); + + if (_subscribed) + { // Already has a subscription thread and can just add a new channel to it. + subscribe(channel); + } + } catch (Exception ex) + { + log("Encountered issue subscribing to a channel."); + ex.printStackTrace(); + ret.setException(ex); + } + + return ret; + } + + @Override + public final void removeChannel(String channel) + { + if (BASE_CHANNEL.equals(channel)) + { // Protects the base subscription + return; + } + + _channels.remove(channel); + + if (_subscribed) + { + unsubscribe(channel); + } + } + + @Override + public final void unsubscribe() + { + destroy(); + } + + @Override + public final void destroy() + { + _destroyed = true; + try + { + super.unsubscribe(); + } catch (NullPointerException e) + { + } + + for (SettableFuture fut : _pendingFutures.values()) + { + fut.set(false); + } + } + + @Override + public final void onMessage(String channel, String message) + { + if (_sub == null || BASE_CHANNEL.equals(channel)) + { + return; + } + + try + { + _sub.onMessage(channel, message); + } catch (Exception e) + { + e.printStackTrace(); + } + } + + @Override + public final ListenableFuture publish(final String channel, final String message) + { + final SettableFuture ret = SettableFuture.create(); + _threadPool.execute(new Runnable() + { + @Override + public void run() + { + Jedis bJedis = null; + try + { + bJedis = _writePool.getResource(); + bJedis.publish(channel, message); + _writePool.returnResource((Jedis) bJedis); + ret.set(true); + + } catch (Exception e) + { + log("Encountered issue while publishing a message."); + e.printStackTrace(); + if (bJedis != null) + { + _writePool.returnBrokenResource((Jedis) bJedis); + } + ret.set(false); + } + } + }); + return ret; + } + + // Confirms successful subscriptions/unsubscriptions. + @Override + public void onSubscribe(String channel, int subscribedChannels) + { + + // informs subscriber that this subscription completed successfully + SettableFuture fut = _pendingFutures.remove(channel); + if (fut != null) + { + fut.set(true); + } + + if (!_subscribed) + { + for (String subscribeTo : _channels) + { + subscribe(subscribeTo); + } + } + _subscribed = true; + + log("Subscribed to channel: " + channel); + } + + @Override + public void onUnsubscribe(String channel, int subscribedChannels) + { + log("Unsubscribed from channel: " + channel); + } + + /** + * Creates the initial listening thread which blocks as it polls redis for new messages. + * Subsequent subscriptions can simply be added using {@link #subscribe(String...)} after the + * subscription thread has been created. + * + * @param firstChannel The first channel to initially subscribe to. If you do not have a first + * channel, there is no reason to create a subscriber thread yet. + */ + private void createSubscription(final String firstChannel) + { + + final JedisPubSub pubsub = this; + + new Thread(new Runnable() + { + @Override + public void run() + { + + boolean first = true; + + while (!_destroyed) + { + + if (!first) + { + log("Jedis connection to " + _readConn.getHost() + ":" + + _readConn.getPort() + + " failed or was interrupted, attempting to reconnect"); + } + first = false; + + Jedis jedisInstance = null; + + try + { + // gets a non-thread-safe jedis instance from the thread-safe pool. + jedisInstance = _readPool.getResource(); + + log("Creating initial jedis subscription to channel " + firstChannel); + // this will block as long as there are subscriptions + jedisInstance.subscribe(pubsub, firstChannel); + + log("jedisInstance.subscribe() returned, subscription over."); + + // when the subscription ends (subscribe() returns), returns the instance to + // the pool + _readPool.returnResource(jedisInstance); + + } catch (JedisConnectionException e) + { + log("Jedis connection encountered an issue."); + e.printStackTrace(); + if (jedisInstance != null) + { + _readPool.returnBrokenResource((Jedis) jedisInstance); + } + + } catch (JedisDataException e) + { + log("Jedis connection encountered an issue."); + e.printStackTrace(); + if (jedisInstance != null) + { + _readPool.returnBrokenResource((Jedis) jedisInstance); + } + } + _subscribed = false; + + // sleeps for a short pause, rather than constantly retrying connection + if (!_destroyed) + { + try + { + Thread.sleep(RECONNECT_PERIOD_MILLIS); + } catch (InterruptedException e) + { + _destroyed = true; + log("Reconnection pause thread was interrupted."); + e.printStackTrace(); + } + } + } + } + }).start(); + } + + // This implementation does not support pattern-matching subscriptions + @Override + public void onPMessage(String pattern, String channel, String message) + { + } + + @Override + public void onPSubscribe(String pattern, int subscribedChannels) + { + } + + @Override + public void onPUnsubscribe(String pattern, int subscribedChannels) + { + } + + private void log(String msg) + { + System.out.println(BOLD + "[" + getClass().getSimpleName() + + (_id != null && !_id.isEmpty() ? " " + _id : "") + "] " + RESET + msg); + } + +} diff --git a/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubLibraryClient.java b/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubLibraryClient.java new file mode 100644 index 000000000..4a9f2820c --- /dev/null +++ b/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubLibraryClient.java @@ -0,0 +1,61 @@ +package mineplex.serverdata.redis.messaging; + +import com.google.common.util.concurrent.ListenableFuture; + +/** + * A multi-channel subscription and publisher to a pub/sub messaging implementation. An interface to + * the actual low-level pub/sub library, whatever it may be. + * + * For the sake of internal efficiency, this makes no guarantees for the sanity or unchangeability + * of arguments passed into its methods. Clients should not generally interact with this directly. + */ +public interface PubSubLibraryClient +{ + + /** + * Publishes a message to all subscribers of a given channel. + * + * @param channel The channel to publish the message on. + * @param message The message to send. + * @return A future object that will complete after an unknown amount of time with + * false if for some locally known reason the message definitely could not + * be published, else completes with true. + */ + ListenableFuture publish(String channel, String message); + + /** + * Adds a channel to this subscription. + * + * @param channel The channel to add. Should not change after being passed in. + * @return The asynchronous, future result of this attempt to add the channel. Will have + * true when the subscription starts successfully. + */ + ListenableFuture addChannel(String channel); + + /** + * Removes a channel from this subscription. + * + * @param channel The channel to remove. Should not change after being passed in. + */ + void removeChannel(String channel); + + /** + * Removes all channels from this subscription, kills its connections, and relinquishes any + * resources it was occupying. + *

+ * Depending on the implementation, once a subscription has been destroyed, it may not be + * reusable and it may be necessary to construct a new one in order to resume. + *

+ * Call this when the subscription is no longer being used. Holding unnecessary connections can + * cause serious performance and other issues on both ends. + */ + void destroy(); + + /** + * Sets the subscriber to inform of messages received by this subscription. + * + * @param sub The listener for this subscription. + */ + void setSubscriber(Subscriber sub); + +} diff --git a/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubMessager.java b/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubMessager.java new file mode 100644 index 000000000..ea45109e1 --- /dev/null +++ b/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubMessager.java @@ -0,0 +1,56 @@ +package mineplex.serverdata.redis.messaging; + +import com.google.common.util.concurrent.ListenableFuture; + +/** + * Messager for standard pub/sub model. Handles multiple publishers and subscribers. + *

+ * All messaging is asynchronous and non-blocking, even to local subscribers. + *

+ * For more about the pub/sub messaging paradigm, see http://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern + */ +public interface PubSubMessager +{ + + /** + * Publishes a message to all subscribers of a given channel. + *

+ * Publishes to all connected subscribers, including local ones. + * + * @param channel The channel to publish the message on. + * @param message The message to send. + * @return A future object that will complete after an unknown amount of time with + * false if for some locally known reason the message definitely could not + * be published, else completes with true (which does not guarantee it + * succeeded 100%). + */ + ListenableFuture publish(String channel, String message); + + /** + * Subscribes to a messaging channel. + *

+ * When incoming messages arrive, the subscriber is called from an arbitrary new thread. + * + * @param channel The channel to subscribe to. + * @param sub The subscriber to inform of incoming messages. + * @return The asynchronous, future result of this attempt to subscribe to the channel. Will + * have true when the subscription starts successfully. + */ + ListenableFuture subscribe(String channel, Subscriber sub); + + /** + * Unsubscribes from a messaging channel. + * + * @param channel The channel to unsubscribe from. + * @param sub The subscriber to stop informing of incoming messages. + */ + void unsubscribe(String channel, Subscriber sub); + + /** + * Attempts to gracefully shut down this messager. Generally irreversible. + */ + void shutdown(); + +} diff --git a/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubRouter.java b/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubRouter.java new file mode 100644 index 000000000..453b90743 --- /dev/null +++ b/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/PubSubRouter.java @@ -0,0 +1,165 @@ +package mineplex.serverdata.redis.messaging; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + +/** + * A pub/sub messager that simply routes messages to some underlying pub/sub implementation, which + * is in turn represented by a multi-channel subscription and a publishing mechanism. + *

+ * This class handles: + *

    + *
  1. Providing a modular messaging interface that is thread-safe. + *
  2. Protecting pub/sub implementations from some bad client behavior/data. + *
  3. Routing messages for multiple subscribers to the same channel(s). + *
+ */ +public class PubSubRouter implements PubSubMessager, Subscriber +{ + + private final PubSubLibraryClient _pubSubClient; + private final Map> _subscribers; + + private final ExecutorService _threadPool; + + public PubSubRouter(PubSubLibraryClient client) + { + if (client == null) + { + throw new IllegalArgumentException("pubsub client cannot be null"); + } + + this._pubSubClient = client; + this._pubSubClient.setSubscriber(this); + this._subscribers = new ConcurrentHashMap>(); + + this._threadPool = Executors.newCachedThreadPool(); + } + + @Override + public final ListenableFuture publish(String channel, String message) + { + if (channel == null || channel.isEmpty()) + { + throw new IllegalArgumentException("channel cannot be null or empty"); + } + + // messages of 0 length are allowed. Null messages are treated as messages of 0 length. + if (message == null) + { + message = ""; + } + + return _pubSubClient.publish(channel, message); + } + + @Override + public final ListenableFuture subscribe(String channel, Subscriber sub) + { + if (channel == null || channel.isEmpty() || sub == null) + { + throw new IllegalArgumentException("params cannot be null and channel cannot be empty"); + } + + ListenableFuture ret = null; + + Set channelSubs = _subscribers.get(channel); + if (channelSubs == null) + { + // uses CopyOnWriteArraySet for fast and consistent iteration (forwarding messages to + // subscribers) but slow writes (adding/removing subscribers). + // See a discussion of the issue here: + // http://stackoverflow.com/questions/6720396/different-types-of-thread-safe-sets-in-java + channelSubs = new CopyOnWriteArraySet(); + _subscribers.put(channel, channelSubs); + + // starts a jedis subscription to the channel if there were no subscribers before + ret = _pubSubClient.addChannel(channel); + + } else + { + ret = SettableFuture.create(); + ((SettableFuture) ret).set(true); // already subscribed, calls back immediately + } + + channelSubs.add(sub); + + return ret; + } + + @Override + public final void unsubscribe(String channel, Subscriber sub) + { + if (channel == null || channel.isEmpty() || sub == null) + { + throw new IllegalArgumentException("params cannot be null and channel cannot be empty"); + } + + Set channelSubs = _subscribers.get(channel); + if (channelSubs == null) + { // no subscribers for the channel to begin with. + return; + } + + channelSubs.remove(sub); + + // stops the subscription to this channel if the unsubscribed was the last subscriber + if (channelSubs.isEmpty()) + { + _subscribers.remove(channel); + _pubSubClient.removeChannel(channel); + } + } + + @Override + public final void onMessage(final String channel, final String message) + { + if (channel == null || message == null || channel.isEmpty()) + { + return; + } + + Set channelSubs = _subscribers.get(channel); + + if (channelSubs == null) + { // We should not still be listening + _pubSubClient.removeChannel(channel); + return; + } else if (channelSubs.isEmpty()) + { + _subscribers.remove(channel); + _pubSubClient.removeChannel(channel); + return; + } + + for (final Subscriber sub : channelSubs) + { + + // Gives subscribers their own thread from the thread pool in which to react to the + // message. + // Avoids interruptions and other problems while iterating over the subscriber set. + _threadPool.execute(new Runnable() + { + @Override + public void run() + { + sub.onMessage(channel, message); + } + }); + } + } + + @Override + public void shutdown() + { + _pubSubClient.destroy(); + } + +} diff --git a/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/Subscriber.java b/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/Subscriber.java new file mode 100644 index 000000000..5ecc09796 --- /dev/null +++ b/Plugins/Mineplex.ServerData/src/mineplex/serverdata/redis/messaging/Subscriber.java @@ -0,0 +1,19 @@ +package mineplex.serverdata.redis.messaging; + +/** + * A subscriber to a pub/sub channel. + */ +public interface Subscriber +{ + + /** + * Called when a message is sent on a channel that this is subscribed to. + *

+ * No guarantees are made about what thread this will be called from. + * + * @param channel The channel that the message was sent on. + * @param message The message that was received. + */ + void onMessage(String channel, String message); + +}