Port redis pubsub messaging API from PE

This commit is contained in:
Kenny 2017-05-02 20:29:36 -04:00
parent 2a92d312b2
commit ff13d07a7c
6 changed files with 667 additions and 0 deletions

View File

@ -9,6 +9,10 @@
<version>dev-SNAPSHOT</version> <version>dev-SNAPSHOT</version>
</parent> </parent>
<properties>
<version.guava>18.0</version.guava>
</properties>
<artifactId>mineplex-serverdata</artifactId> <artifactId>mineplex-serverdata</artifactId>
<dependencies> <dependencies>
@ -33,5 +37,11 @@
<artifactId>jooq</artifactId> <artifactId>jooq</artifactId>
<version>3.5.2</version> <version>3.5.2</version>
</dependency> </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${version.guava}</version>
<scope>compile</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -0,0 +1,356 @@
package mineplex.serverdata.redis.messaging;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import mineplex.serverdata.Utility;
import mineplex.serverdata.servers.ConnectionData;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
/**
* A subscription to a Redis pub/sub system through a Jedis client. Includes a publishing mechanism
* as well.
* <p>
* Subscribes to Jedis and offers a variety of methods to edit the channels that this listens to
* over time. Does not support pattern-matching, even though Jedis can. Takes a single subscriber to
* inform of incoming messages (on all channels this is subscribed to).
* <p>
* For the sake of internal efficiency, this does not protect the sanity or unchangeability of
* arguments passed into its methods. Clients should not generally interact with this directly.
* <p>
* The Jedis pub/sub interface is a little confusing, especially when it comes to multithreading. At
* any given time, if this class is subscribed to any channels at all, it will have a single thread
* that is listening for incoming messages from redis with a blocking method. After that listening
* thread is created, we can add and remove subscriptions as desired, but the initial subscription
* and actual listening must be done on its own thread. When all channels are unsubscribed from, the
* listening thread returns. Note that this is stated with about 70% certainty, as the internals of
* the pub/sub mechanism are not entirely clear to me.
* <p>
* This class maintains a constant connection to its redis server by subscribing to a base channel.
* This makes it much easier to protect its operation from potentially insane client commands.
* <p>
* If the connection to the given redis instance fails or is interrupted, will keep attempting to
* reconnect periodically until destroyed. Publishers and subscribers are not informed of failure in
* any way.
* <p>
* When {@link #unsubscribe()} or {@link #destroy()} is called, this class ceases operation.
*/
public class PubSubJedisClient extends JedisPubSub implements PubSubLibraryClient
{
private static final long RECONNECT_PERIOD_MILLIS = 800;
private static final String BASE_CHANNEL = "pG8n5jp#";
private static final String BOLD = "\u001B[1m";
private static final String RESET = "\u001B[0m";
private final String _id;
private JedisPool _writePool;
private final ConnectionData _readConn;
private JedisPool _readPool;
private final ExecutorService _threadPool = Executors.newCachedThreadPool();
private volatile Subscriber _sub;
private final Set<String> _channels = Collections
.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private final Map<String, SettableFuture<Boolean>> _pendingFutures = new ConcurrentHashMap<String, SettableFuture<Boolean>>();
private volatile boolean _subscribed; // Is there a base subscription yet?
private volatile boolean _destroyed; // has this been deliberately destroyed?
/**
* Class constructor.
*
* @param writeTo The connection info for the redis instance this client should publish to.
* @param readFrom The connection info for the redis instance this client to subscribe to.
*/
public PubSubJedisClient(ConnectionData writeTo, ConnectionData readFrom)
{
if (writeTo == null)
{
throw new IllegalArgumentException("redis connection info cannot be null");
}
_id = writeTo.getName();
_writePool = Utility.generatePool(writeTo);
_readConn = readFrom;
_readPool = Utility.generatePool(readFrom);
createSubscription(BASE_CHANNEL);
}
@Override
public final synchronized void setSubscriber(Subscriber sub)
{
_sub = sub;
}
@Override
public final ListenableFuture<Boolean> addChannel(String channel)
{
SettableFuture<Boolean> ret = _pendingFutures.get(channel);
if (ret == null)
{
ret = SettableFuture.create();
_pendingFutures.put(channel, ret);
}
try
{
_channels.add(channel);
if (_subscribed)
{ // Already has a subscription thread and can just add a new channel to it.
subscribe(channel);
}
} catch (Exception ex)
{
log("Encountered issue subscribing to a channel.");
ex.printStackTrace();
ret.setException(ex);
}
return ret;
}
@Override
public final void removeChannel(String channel)
{
if (BASE_CHANNEL.equals(channel))
{ // Protects the base subscription
return;
}
_channels.remove(channel);
if (_subscribed)
{
unsubscribe(channel);
}
}
@Override
public final void unsubscribe()
{
destroy();
}
@Override
public final void destroy()
{
_destroyed = true;
try
{
super.unsubscribe();
} catch (NullPointerException e)
{
}
for (SettableFuture<Boolean> fut : _pendingFutures.values())
{
fut.set(false);
}
}
@Override
public final void onMessage(String channel, String message)
{
if (_sub == null || BASE_CHANNEL.equals(channel))
{
return;
}
try
{
_sub.onMessage(channel, message);
} catch (Exception e)
{
e.printStackTrace();
}
}
@Override
public final ListenableFuture<Boolean> publish(final String channel, final String message)
{
final SettableFuture<Boolean> ret = SettableFuture.create();
_threadPool.execute(new Runnable()
{
@Override
public void run()
{
Jedis bJedis = null;
try
{
bJedis = _writePool.getResource();
bJedis.publish(channel, message);
_writePool.returnResource((Jedis) bJedis);
ret.set(true);
} catch (Exception e)
{
log("Encountered issue while publishing a message.");
e.printStackTrace();
if (bJedis != null)
{
_writePool.returnBrokenResource((Jedis) bJedis);
}
ret.set(false);
}
}
});
return ret;
}
// Confirms successful subscriptions/unsubscriptions.
@Override
public void onSubscribe(String channel, int subscribedChannels)
{
// informs subscriber that this subscription completed successfully
SettableFuture<Boolean> fut = _pendingFutures.remove(channel);
if (fut != null)
{
fut.set(true);
}
if (!_subscribed)
{
for (String subscribeTo : _channels)
{
subscribe(subscribeTo);
}
}
_subscribed = true;
log("Subscribed to channel: " + channel);
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels)
{
log("Unsubscribed from channel: " + channel);
}
/**
* Creates the initial listening thread which blocks as it polls redis for new messages.
* Subsequent subscriptions can simply be added using {@link #subscribe(String...)} after the
* subscription thread has been created.
*
* @param firstChannel The first channel to initially subscribe to. If you do not have a first
* channel, there is no reason to create a subscriber thread yet.
*/
private void createSubscription(final String firstChannel)
{
final JedisPubSub pubsub = this;
new Thread(new Runnable()
{
@Override
public void run()
{
boolean first = true;
while (!_destroyed)
{
if (!first)
{
log("Jedis connection to " + _readConn.getHost() + ":"
+ _readConn.getPort()
+ " failed or was interrupted, attempting to reconnect");
}
first = false;
Jedis jedisInstance = null;
try
{
// gets a non-thread-safe jedis instance from the thread-safe pool.
jedisInstance = _readPool.getResource();
log("Creating initial jedis subscription to channel " + firstChannel);
// this will block as long as there are subscriptions
jedisInstance.subscribe(pubsub, firstChannel);
log("jedisInstance.subscribe() returned, subscription over.");
// when the subscription ends (subscribe() returns), returns the instance to
// the pool
_readPool.returnResource(jedisInstance);
} catch (JedisConnectionException e)
{
log("Jedis connection encountered an issue.");
e.printStackTrace();
if (jedisInstance != null)
{
_readPool.returnBrokenResource((Jedis) jedisInstance);
}
} catch (JedisDataException e)
{
log("Jedis connection encountered an issue.");
e.printStackTrace();
if (jedisInstance != null)
{
_readPool.returnBrokenResource((Jedis) jedisInstance);
}
}
_subscribed = false;
// sleeps for a short pause, rather than constantly retrying connection
if (!_destroyed)
{
try
{
Thread.sleep(RECONNECT_PERIOD_MILLIS);
} catch (InterruptedException e)
{
_destroyed = true;
log("Reconnection pause thread was interrupted.");
e.printStackTrace();
}
}
}
}
}).start();
}
// This implementation does not support pattern-matching subscriptions
@Override
public void onPMessage(String pattern, String channel, String message)
{
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels)
{
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels)
{
}
private void log(String msg)
{
System.out.println(BOLD + "[" + getClass().getSimpleName()
+ (_id != null && !_id.isEmpty() ? " " + _id : "") + "] " + RESET + msg);
}
}

View File

@ -0,0 +1,61 @@
package mineplex.serverdata.redis.messaging;
import com.google.common.util.concurrent.ListenableFuture;
/**
* A multi-channel subscription and publisher to a pub/sub messaging implementation. An interface to
* the actual low-level pub/sub library, whatever it may be.
*
* For the sake of internal efficiency, this makes no guarantees for the sanity or unchangeability
* of arguments passed into its methods. Clients should not generally interact with this directly.
*/
public interface PubSubLibraryClient
{
/**
* Publishes a message to all subscribers of a given channel.
*
* @param channel The channel to publish the message on.
* @param message The message to send.
* @return A future object that will complete after an unknown amount of time with
* <code>false</code> if for some locally known reason the message definitely could not
* be published, else completes with <code>true</code>.
*/
ListenableFuture<Boolean> publish(String channel, String message);
/**
* Adds a channel to this subscription.
*
* @param channel The channel to add. Should not change after being passed in.
* @return The asynchronous, future result of this attempt to add the channel. Will have
* <code>true</code> when the subscription starts successfully.
*/
ListenableFuture<Boolean> addChannel(String channel);
/**
* Removes a channel from this subscription.
*
* @param channel The channel to remove. Should not change after being passed in.
*/
void removeChannel(String channel);
/**
* Removes all channels from this subscription, kills its connections, and relinquishes any
* resources it was occupying.
* <p>
* Depending on the implementation, once a subscription has been destroyed, it may not be
* reusable and it may be necessary to construct a new one in order to resume.
* <p>
* Call this when the subscription is no longer being used. Holding unnecessary connections can
* cause serious performance and other issues on both ends.
*/
void destroy();
/**
* Sets the subscriber to inform of messages received by this subscription.
*
* @param sub The listener for this subscription.
*/
void setSubscriber(Subscriber sub);
}

View File

@ -0,0 +1,56 @@
package mineplex.serverdata.redis.messaging;
import com.google.common.util.concurrent.ListenableFuture;
/**
* Messager for standard pub/sub model. Handles multiple publishers and subscribers.
* <p>
* All messaging is asynchronous and non-blocking, even to local subscribers.
* <p>
* For more about the pub/sub messaging paradigm, see <a
* href="http://en.wikipedia.org/wiki/Publish%E2
* %80%93subscribe_pattern">http://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern</a>
*/
public interface PubSubMessager
{
/**
* Publishes a message to all subscribers of a given channel.
* <p>
* Publishes to all connected subscribers, including local ones.
*
* @param channel The channel to publish the message on.
* @param message The message to send.
* @return A future object that will complete after an unknown amount of time with
* <code>false</code> if for some locally known reason the message definitely could not
* be published, else completes with <code>true</code> (which does not guarantee it
* succeeded 100%).
*/
ListenableFuture<Boolean> publish(String channel, String message);
/**
* Subscribes to a messaging channel.
* <p>
* When incoming messages arrive, the subscriber is called from an arbitrary new thread.
*
* @param channel The channel to subscribe to.
* @param sub The subscriber to inform of incoming messages.
* @return The asynchronous, future result of this attempt to subscribe to the channel. Will
* have <code>true</code> when the subscription starts successfully.
*/
ListenableFuture<Boolean> subscribe(String channel, Subscriber sub);
/**
* Unsubscribes from a messaging channel.
*
* @param channel The channel to unsubscribe from.
* @param sub The subscriber to stop informing of incoming messages.
*/
void unsubscribe(String channel, Subscriber sub);
/**
* Attempts to gracefully shut down this messager. Generally irreversible.
*/
void shutdown();
}

View File

@ -0,0 +1,165 @@
package mineplex.serverdata.redis.messaging;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
/**
* A pub/sub messager that simply routes messages to some underlying pub/sub implementation, which
* is in turn represented by a multi-channel subscription and a publishing mechanism.
* <p>
* This class handles:
* <ol>
* <li>Providing a modular messaging interface that is thread-safe.
* <li>Protecting pub/sub implementations from some bad client behavior/data.
* <li>Routing messages for multiple subscribers to the same channel(s).
* </ol>
*/
public class PubSubRouter implements PubSubMessager, Subscriber
{
private final PubSubLibraryClient _pubSubClient;
private final Map<String, Set<Subscriber>> _subscribers;
private final ExecutorService _threadPool;
public PubSubRouter(PubSubLibraryClient client)
{
if (client == null)
{
throw new IllegalArgumentException("pubsub client cannot be null");
}
this._pubSubClient = client;
this._pubSubClient.setSubscriber(this);
this._subscribers = new ConcurrentHashMap<String, Set<Subscriber>>();
this._threadPool = Executors.newCachedThreadPool();
}
@Override
public final ListenableFuture<Boolean> publish(String channel, String message)
{
if (channel == null || channel.isEmpty())
{
throw new IllegalArgumentException("channel cannot be null or empty");
}
// messages of 0 length are allowed. Null messages are treated as messages of 0 length.
if (message == null)
{
message = "";
}
return _pubSubClient.publish(channel, message);
}
@Override
public final ListenableFuture<Boolean> subscribe(String channel, Subscriber sub)
{
if (channel == null || channel.isEmpty() || sub == null)
{
throw new IllegalArgumentException("params cannot be null and channel cannot be empty");
}
ListenableFuture<Boolean> ret = null;
Set<Subscriber> channelSubs = _subscribers.get(channel);
if (channelSubs == null)
{
// uses CopyOnWriteArraySet for fast and consistent iteration (forwarding messages to
// subscribers) but slow writes (adding/removing subscribers).
// See a discussion of the issue here:
// http://stackoverflow.com/questions/6720396/different-types-of-thread-safe-sets-in-java
channelSubs = new CopyOnWriteArraySet<Subscriber>();
_subscribers.put(channel, channelSubs);
// starts a jedis subscription to the channel if there were no subscribers before
ret = _pubSubClient.addChannel(channel);
} else
{
ret = SettableFuture.create();
((SettableFuture<Boolean>) ret).set(true); // already subscribed, calls back immediately
}
channelSubs.add(sub);
return ret;
}
@Override
public final void unsubscribe(String channel, Subscriber sub)
{
if (channel == null || channel.isEmpty() || sub == null)
{
throw new IllegalArgumentException("params cannot be null and channel cannot be empty");
}
Set<Subscriber> channelSubs = _subscribers.get(channel);
if (channelSubs == null)
{ // no subscribers for the channel to begin with.
return;
}
channelSubs.remove(sub);
// stops the subscription to this channel if the unsubscribed was the last subscriber
if (channelSubs.isEmpty())
{
_subscribers.remove(channel);
_pubSubClient.removeChannel(channel);
}
}
@Override
public final void onMessage(final String channel, final String message)
{
if (channel == null || message == null || channel.isEmpty())
{
return;
}
Set<Subscriber> channelSubs = _subscribers.get(channel);
if (channelSubs == null)
{ // We should not still be listening
_pubSubClient.removeChannel(channel);
return;
} else if (channelSubs.isEmpty())
{
_subscribers.remove(channel);
_pubSubClient.removeChannel(channel);
return;
}
for (final Subscriber sub : channelSubs)
{
// Gives subscribers their own thread from the thread pool in which to react to the
// message.
// Avoids interruptions and other problems while iterating over the subscriber set.
_threadPool.execute(new Runnable()
{
@Override
public void run()
{
sub.onMessage(channel, message);
}
});
}
}
@Override
public void shutdown()
{
_pubSubClient.destroy();
}
}

View File

@ -0,0 +1,19 @@
package mineplex.serverdata.redis.messaging;
/**
* A subscriber to a pub/sub channel.
*/
public interface Subscriber
{
/**
* Called when a message is sent on a channel that this is subscribed to.
* <p>
* No guarantees are made about what thread this will be called from.
*
* @param channel The channel that the message was sent on.
* @param message The message that was received.
*/
void onMessage(String channel, String message);
}