Add proper thread safety. Please let me know if this deadlocks your server.
This commit is contained in:
parent
36f3a7a7f2
commit
b7ffd83675
@ -1,4 +1,4 @@
|
||||
From 8794e1bc1ae9773a9f7c9943c7696a76e66691aa Mon Sep 17 00:00:00 2001
|
||||
From 2624cc163bed8a94b851c7dbc186a304ce63bff7 Mon Sep 17 00:00:00 2001
|
||||
From: md_5 <md_5@live.com.au>
|
||||
Date: Sun, 3 Feb 2013 10:24:33 +1100
|
||||
Subject: [PATCH] Netty
|
||||
@ -17,13 +17,13 @@ This commit is licensed under the Creative Commons Attribution-ShareAlike 3.0 Un
|
||||
.../net/minecraft/server/PendingConnection.java | 15 +-
|
||||
.../net/minecraft/server/PlayerConnection.java | 2 +-
|
||||
src/main/java/org/spigotmc/netty/CipherCodec.java | 65 ++++++
|
||||
.../org/spigotmc/netty/NettyNetworkManager.java | 211 ++++++++++++++++++
|
||||
.../org/spigotmc/netty/NettyNetworkManager.java | 221 ++++++++++++++++++
|
||||
.../org/spigotmc/netty/NettyServerConnection.java | 105 +++++++++
|
||||
.../org/spigotmc/netty/NettySocketAdaptor.java | 248 +++++++++++++++++++++
|
||||
.../java/org/spigotmc/netty/PacketDecoder.java | 47 ++++
|
||||
.../java/org/spigotmc/netty/PacketEncoder.java | 43 ++++
|
||||
.../java/org/spigotmc/netty/PacketListener.java | 100 +++++++++
|
||||
11 files changed, 841 insertions(+), 7 deletions(-)
|
||||
11 files changed, 851 insertions(+), 7 deletions(-)
|
||||
create mode 100644 src/main/java/org/spigotmc/netty/CipherCodec.java
|
||||
create mode 100644 src/main/java/org/spigotmc/netty/NettyNetworkManager.java
|
||||
create mode 100644 src/main/java/org/spigotmc/netty/NettyServerConnection.java
|
||||
@ -208,10 +208,10 @@ index 0000000..cfc0535
|
||||
+}
|
||||
diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java
|
||||
new file mode 100644
|
||||
index 0000000..effd1ee
|
||||
index 0000000..81aa42e
|
||||
--- /dev/null
|
||||
+++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java
|
||||
@@ -0,0 +1,211 @@
|
||||
@@ -0,0 +1,221 @@
|
||||
+package org.spigotmc.netty;
|
||||
+
|
||||
+import io.netty.channel.Channel;
|
||||
@ -247,8 +247,9 @@ index 0000000..effd1ee
|
||||
+ private static final PrivateKey key = server.F().getPrivate();
|
||||
+ private static final NettyServerConnection serverConnection = (NettyServerConnection) server.ae();
|
||||
+ /*========================================================================*/
|
||||
+ private Queue<Packet> syncPackets = new ConcurrentLinkedQueue<Packet>();
|
||||
+ private volatile Channel channel;
|
||||
+ private final Object mutex = new Object();
|
||||
+ private final Queue<Packet> syncPackets = new ConcurrentLinkedQueue<Packet>();
|
||||
+ private Channel channel;
|
||||
+ private SocketAddress address;
|
||||
+ private Connection handler;
|
||||
+ private SecretKey secret;
|
||||
@ -258,35 +259,41 @@ index 0000000..effd1ee
|
||||
+
|
||||
+ @Override
|
||||
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
+ // Channel and address groundwork first
|
||||
+ channel = ctx.channel();
|
||||
+ address = channel.remoteAddress();
|
||||
+ // Then the socket adaptor
|
||||
+ socketAdaptor = NettySocketAdaptor.adapt((SocketChannel) channel);
|
||||
+ // Followed by their first handler
|
||||
+ handler = new PendingConnection(server, this);
|
||||
+ // Finally register the connection
|
||||
+ serverConnection.pendingConnections.add((PendingConnection) handler);
|
||||
+ synchronized (mutex) {
|
||||
+ // Channel and address groundwork first
|
||||
+ channel = ctx.channel();
|
||||
+ address = channel.remoteAddress();
|
||||
+ // Then the socket adaptor
|
||||
+ socketAdaptor = NettySocketAdaptor.adapt((SocketChannel) channel);
|
||||
+ // Followed by their first handler
|
||||
+ handler = new PendingConnection(server, this);
|
||||
+ // Finally register the connection
|
||||
+ serverConnection.pendingConnections.add((PendingConnection) handler);
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
+ @Override
|
||||
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
+ if (dcReason == null || dcArgs == null) {
|
||||
+ a("disconnect.endOfStream", new Object[0]);
|
||||
+ synchronized (mutex) {
|
||||
+ if (dcReason == null || dcArgs == null) {
|
||||
+ a("disconnect.endOfStream", new Object[0]);
|
||||
+ }
|
||||
+ // Remove channel reference to indicate we are done
|
||||
+ channel = null;
|
||||
+ }
|
||||
+ // Remove channel reference to indicate we are done
|
||||
+ channel = null;
|
||||
+ }
|
||||
+
|
||||
+ @Override
|
||||
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
+ // TODO: Remove this once we are more stable
|
||||
+ // Bukkit.getServer().getLogger().severe("======================= Start Netty Debug Log =======================");
|
||||
+ // Bukkit.getServer().getLogger().log(Level.SEVERE, "Error caught whilst handling " + channel, cause);
|
||||
+ // Bukkit.getServer().getLogger().severe("======================= End Netty Debug Log =======================");
|
||||
+ synchronized (mutex) {
|
||||
+ // TODO: Remove this once we are more stable
|
||||
+ // Bukkit.getServer().getLogger().severe("======================= Start Netty Debug Log =======================");
|
||||
+ // Bukkit.getServer().getLogger().log(Level.SEVERE, "Error caught whilst handling " + channel, cause);
|
||||
+ // Bukkit.getServer().getLogger().severe("======================= End Netty Debug Log =======================");
|
||||
+
|
||||
+ // Disconnect with generic reason + exception
|
||||
+ a("disconnect.genericReason", new Object[]{"Internal exception: " + cause});
|
||||
+ // Disconnect with generic reason + exception
|
||||
+ a("disconnect.genericReason", new Object[]{"Internal exception: " + cause});
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
+ @Override
|
||||
@ -329,21 +336,20 @@ index 0000000..effd1ee
|
||||
+ * @param packet the packet to queue
|
||||
+ */
|
||||
+ public void queue(Packet packet) {
|
||||
+ // Only send if channel is still connected
|
||||
+ if (channel != null) {
|
||||
+ // Process packet via handler
|
||||
+ packet = PacketListener.callQueued(this, handler, packet);
|
||||
+ // If handler indicates packet send
|
||||
+ if (packet != null) {
|
||||
+ channel.write(packet);
|
||||
+ synchronized (mutex) {
|
||||
+ // Only send if channel is still connected
|
||||
+ if (channel != null) {
|
||||
+ // Process packet via handler
|
||||
+ packet = PacketListener.callQueued(this, handler, packet);
|
||||
+ // If handler indicates packet send
|
||||
+ if (packet != null) {
|
||||
+ channel.write(packet);
|
||||
+
|
||||
+ // If needed, check and prepare encryption phase
|
||||
+ if (packet instanceof Packet252KeyResponse) {
|
||||
+ BufferedBlockCipher encrypt = NettyServerConnection.getCipher(true, secret);
|
||||
+ BufferedBlockCipher decrypt = NettyServerConnection.getCipher(false, secret);
|
||||
+ CipherCodec codec = new CipherCodec(encrypt, decrypt);
|
||||
+ // Only add if the channel hasn't disconnected in the meantime
|
||||
+ if (channel != null) {
|
||||
+ // If needed, check and prepare encryption phase
|
||||
+ if (packet instanceof Packet252KeyResponse) {
|
||||
+ BufferedBlockCipher encrypt = NettyServerConnection.getCipher(true, secret);
|
||||
+ BufferedBlockCipher decrypt = NettyServerConnection.getCipher(false, secret);
|
||||
+ CipherCodec codec = new CipherCodec(encrypt, decrypt);
|
||||
+ channel.pipeline().addBefore("decoder", "cipher", codec);
|
||||
+ }
|
||||
+ }
|
||||
@ -392,8 +398,10 @@ index 0000000..effd1ee
|
||||
+ * close. Close and release all resources associated with this connection.
|
||||
+ */
|
||||
+ public void d() {
|
||||
+ if (channel != null) {
|
||||
+ channel.close();
|
||||
+ synchronized (mutex) {
|
||||
+ if (channel != null) {
|
||||
+ channel.close();
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
@ -416,10 +424,12 @@ index 0000000..effd1ee
|
||||
+ * exception which triggered the disconnect.
|
||||
+ */
|
||||
+ public void a(String reason, Object... arguments) {
|
||||
+ if (channel != null) {
|
||||
+ dcReason = reason;
|
||||
+ dcArgs = arguments;
|
||||
+ channel.close();
|
||||
+ synchronized (mutex) {
|
||||
+ if (channel != null) {
|
||||
+ dcReason = reason;
|
||||
+ dcArgs = arguments;
|
||||
+ channel.close();
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
+}
|
||||
|
Loading…
Reference in New Issue
Block a user