Handle multiple multi-threaded Redis connections (and re-connections), improved logging, various refactors.

This commit is contained in:
Keir 2016-01-07 06:58:21 +00:00
parent 98a7b6d6d6
commit abc15e3a4c
8 changed files with 161 additions and 23 deletions

View File

@ -8,6 +8,7 @@
<element id="extracted-dir" path="$PROJECT_DIR$/Libraries/commons-pool2-2.2.jar" path-in-jar="/" /> <element id="extracted-dir" path="$PROJECT_DIR$/Libraries/commons-pool2-2.2.jar" path-in-jar="/" />
<element id="module-output" name="Mineplex.ServerData" /> <element id="module-output" name="Mineplex.ServerData" />
<element id="extracted-dir" path="$PROJECT_DIR$/Libraries/commons-cli-1.3.1.jar" path-in-jar="/" /> <element id="extracted-dir" path="$PROJECT_DIR$/Libraries/commons-cli-1.3.1.jar" path-in-jar="/" />
<element id="extracted-dir" path="$PROJECT_DIR$/Libraries/commons-lang3-3.1.jar" path-in-jar="/" />
</root> </root>
</artifact> </artifact>
</component> </component>

View File

@ -0,0 +1,9 @@
<component name="libraryTable">
<library name="commons-lang3-3.1">
<CLASSES>
<root url="jar://$PROJECT_DIR$/Libraries/commons-lang3-3.1.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES />
</library>
</component>

View File

@ -10,6 +10,7 @@
<orderEntry type="library" name="gson" level="project" /> <orderEntry type="library" name="gson" level="project" />
<orderEntry type="library" name="commons-pool2" level="project" /> <orderEntry type="library" name="commons-pool2" level="project" />
<orderEntry type="library" name="commons-cli-1.3.1" level="project" /> <orderEntry type="library" name="commons-cli-1.3.1" level="project" />
<orderEntry type="library" name="commons-lang3-3.1" level="project" />
<orderEntry type="module" module-name="Mineplex.ServerData" /> <orderEntry type="module" module-name="Mineplex.ServerData" />
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
</component> </component>

View File

@ -8,14 +8,14 @@ import java.util.logging.Logger;
/** /**
* @author iKeirNez * @author iKeirNez
*/ */
public class PurgeTask implements Runnable public class FilePurger implements Runnable
{ {
private static final FileFilter FILE_FILTER = file -> file.isFile() && file.getName().endsWith(".json"); private static final FileFilter FILE_FILTER = file -> file.isFile() && file.getName().endsWith(".json");
private final File _dataDir; private final File _dataDir;
private final Logger _logger; private final Logger _logger;
public PurgeTask(File dataDir, Logger logger) public FilePurger(File dataDir, Logger logger)
{ {
_dataDir = dataDir; _dataDir = dataDir;
_logger = logger; _logger = logger;

View File

@ -7,6 +7,8 @@ import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.util.Arrays; import java.util.Arrays;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -20,7 +22,7 @@ import redis.clients.jedis.JedisPubSub;
/** /**
* @author iKeirNez * @author iKeirNez
*/ */
public class JedisPubSubHandler extends JedisPubSub public class RedisCommandHandler extends JedisPubSub
{ {
private static final Gson _gson = new GsonBuilder() private static final Gson _gson = new GsonBuilder()
.setPrettyPrinting() .setPrettyPrinting()
@ -31,7 +33,7 @@ public class JedisPubSubHandler extends JedisPubSub
private File _directory; private File _directory;
private Logger _logger; private Logger _logger;
public JedisPubSubHandler(File directory, Logger logger) public RedisCommandHandler(File directory, Logger logger)
{ {
_directory = directory; _directory = directory;
_logger = logger; _logger = logger;

View File

@ -0,0 +1,107 @@
package mineplex.chatsnap;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang3.time.DurationFormatUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
/**
* @author iKeirNez
*/
public class RedisConnectionHandler implements Runnable
{
private final String _name;
private final JedisPool _jedisPool;
private final RedisCommandHandler _handler;
private final String[] _channels;
private final Logger _logger;
private long _lastConnectionMillis = -1;
private Throwable _lastThrowable = null;
public RedisConnectionHandler(String name, JedisPool jedisPool, RedisCommandHandler handler, String[] channels, Logger logger)
{
if (channels.length == 0)
{
throw new IllegalArgumentException("Must provide at least one channel.");
}
_name = name;
_jedisPool = jedisPool;
_handler = handler;
_channels = channels;
_logger = logger;
}
@Override
public void run()
{
while (!Thread.interrupted())
{
try
{
registerChannelHandlers();
}
catch (Throwable e)
{
// Only log new errors (prevents same error being spammed)
if (_lastThrowable == null || !e.getClass().equals(_lastThrowable.getClass()))
{
if (_lastThrowable == null) // connection just failed
{
_lastConnectionMillis = System.currentTimeMillis();
}
_logger.log(Level.SEVERE, prefixMessage(
"Exception in Redis connection"
+ (_lastConnectionMillis != -1 ? " (no connection for " + getLastConnectionDuration() + ")" : "")
+ ", attempting to regain connection."
), e);
_lastThrowable = e;
}
try
{
Thread.sleep(1000 * 5);
}
catch (InterruptedException ignored) {}
}
}
_logger.warning("Thread interrupted, end of connection.");
}
private void registerChannelHandlers()
{
try (Jedis jedis = _jedisPool.getResource())
{
connectionEstablished();
jedis.subscribe(_handler, _channels);
}
}
private void connectionEstablished()
{
// subscribe blocks so we need to do all this before
_logger.info(
_lastThrowable == null
? prefixMessage("Connected.")
: prefixMessage(String.format("Connected after %s.", getLastConnectionDuration()))
);
_lastThrowable = null;
}
private String prefixMessage(String message)
{
return String.format("[%s] %s", _name, message);
}
private String getLastConnectionDuration()
{
return DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - _lastConnectionMillis, true, true);
}
}

View File

@ -8,6 +8,8 @@ import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import mineplex.serverdata.Utility; import mineplex.serverdata.Utility;
import mineplex.serverdata.redis.RedisConfig;
import mineplex.serverdata.servers.ConnectionData;
import mineplex.serverdata.servers.ServerManager; import mineplex.serverdata.servers.ServerManager;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.CommandLineParser;
@ -15,7 +17,6 @@ import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException; import org.apache.commons.cli.ParseException;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPool;
/** /**
@ -26,8 +27,12 @@ public class ReportServer
public static final String CHANNEL_DEPLOY = "chatsnap:deploy"; public static final String CHANNEL_DEPLOY = "chatsnap:deploy";
public static final String CHANNEL_DESTROY = "chatsnap:destroy"; public static final String CHANNEL_DESTROY = "chatsnap:destroy";
private static final String[] CHANNELS = new String[]{CHANNEL_DEPLOY, CHANNEL_DESTROY};
public static void main(String[] args) public static void main(String[] args)
{ {
System.setProperty("java.util.logging.SimpleFormatter.format", "%4$s: %5$s%6$s%n"); // Nicer log output
Logger logger = Logger.getLogger("ReportServer"); Logger logger = Logger.getLogger("ReportServer");
logger.info("Starting report server."); logger.info("Starting report server.");
@ -53,7 +58,7 @@ public class ReportServer
dataDirectory = new File("data"); dataDirectory = new File("data");
} }
new ReportServer(Utility.generatePool(ServerManager.getMasterConnection()), dataDirectory, logger); new ReportServer(ServerManager.getDefaultConfig(), dataDirectory, logger);
} }
catch (ParseException e) catch (ParseException e)
{ {
@ -61,14 +66,14 @@ public class ReportServer
} }
} }
private JedisPool _jedisPool; private final File _dataDirectory;
private File _dataDirectory; private final Logger _logger;
private Logger _logger;
private ScheduledExecutorService _executorService = Executors.newScheduledThreadPool(1);
public ReportServer(JedisPool jedisPool, File dataDirectory, Logger logger) private final RedisCommandHandler _handler;
private final ScheduledExecutorService _executorService = Executors.newScheduledThreadPool(1);
public ReportServer(RedisConfig redisConfig, File dataDirectory, Logger logger)
{ {
_jedisPool = jedisPool;
_dataDirectory = dataDirectory; _dataDirectory = dataDirectory;
_logger = logger; _logger = logger;
@ -82,22 +87,27 @@ public class ReportServer
throw new RuntimeException("Unable to create directory: " + _dataDirectory.getPath()); throw new RuntimeException("Unable to create directory: " + _dataDirectory.getPath());
} }
registerHandler(); _handler = new RedisCommandHandler(_dataDirectory, _logger);
schedulePurgeTask();
_logger.info("Listening for commands on Redis."); initializeConnectionsConfig(redisConfig);
schedulePurgeTask();
} }
private void registerHandler() private void initializeConnectionsConfig(RedisConfig redisConfig)
{ {
try (Jedis jedis = _jedisPool.getResource()) redisConfig.getConnections(false, null).forEach(this::initializeConnection);
{ }
jedis.subscribe(new JedisPubSubHandler(_dataDirectory, _logger), CHANNEL_DEPLOY, CHANNEL_DESTROY);
} private void initializeConnection(ConnectionData connectionData)
{
JedisPool jedisPool = Utility.generatePool(connectionData);
Thread thread = new Thread(new RedisConnectionHandler(connectionData.getName(), jedisPool, _handler, CHANNELS, _logger));
thread.setDaemon(true);
thread.start();
} }
private void schedulePurgeTask() private void schedulePurgeTask()
{ {
_executorService.scheduleAtFixedRate(new PurgeTask(_dataDirectory, _logger), 0, 30, TimeUnit.MINUTES); _executorService.scheduleAtFixedRate(new FilePurger(_dataDirectory, _logger), 0, 30, TimeUnit.MINUTES);
} }
} }

View File

@ -75,7 +75,7 @@ public class ServerManager
public static ConnectionData getConnection(boolean writeable, String name) public static ConnectionData getConnection(boolean writeable, String name)
{ {
return getConfig(DEFAULT_CONFIG).getConnection(writeable, name); return getDefaultConfig().getConnection(writeable, name);
} }
/** /**
@ -86,7 +86,15 @@ public class ServerManager
{ {
return getConnection(writeable, "DefaultConnection"); return getConnection(writeable, "DefaultConnection");
} }
/**
* @return the default {@link RedisConfig} associated with this manager, providing appropriate connections.
*/
public static RedisConfig getDefaultConfig()
{
return getConfig(DEFAULT_CONFIG);
}
/** /**
* @return the {@link RedisConfig} associated with this manager, providing appropriate connections. * @return the {@link RedisConfig} associated with this manager, providing appropriate connections.
*/ */