From 632f1fe0d5566c3eb3df1f3ad1cd0619225e5f5f Mon Sep 17 00:00:00 2001 From: md_5 Date: Thu, 14 Feb 2013 17:32:20 +1100 Subject: [PATCH] Netty --- pom.xml | 5 + .../java/net/minecraft/server/DedicatedServer.java | 2 +- .../net/minecraft/server/Packet51MapChunk.java | 2 +- .../net/minecraft/server/Packet56MapChunkBulk.java | 2 +- .../net/minecraft/server/PendingConnection.java | 13 +- src/main/java/org/spigotmc/netty/CipherCodec.java | 65 ++++++ .../org/spigotmc/netty/NettyNetworkManager.java | 228 +++++++++++++++++++ .../org/spigotmc/netty/NettyServerConnection.java | 105 +++++++++ .../org/spigotmc/netty/NettySocketAdaptor.java | 248 +++++++++++++++++++++ .../java/org/spigotmc/netty/PacketDecoder.java | 63 ++++++ .../java/org/spigotmc/netty/PacketEncoder.java | 43 ++++ .../java/org/spigotmc/netty/PacketListener.java | 100 +++++++++ src/main/java/org/spigotmc/netty/ReadState.java | 16 ++ 13 files changed, 885 insertions(+), 7 deletions(-) create mode 100644 src/main/java/org/spigotmc/netty/CipherCodec.java create mode 100644 src/main/java/org/spigotmc/netty/NettyNetworkManager.java create mode 100644 src/main/java/org/spigotmc/netty/NettyServerConnection.java create mode 100644 src/main/java/org/spigotmc/netty/NettySocketAdaptor.java create mode 100644 src/main/java/org/spigotmc/netty/PacketDecoder.java create mode 100644 src/main/java/org/spigotmc/netty/PacketEncoder.java create mode 100644 src/main/java/org/spigotmc/netty/PacketListener.java create mode 100644 src/main/java/org/spigotmc/netty/ReadState.java diff --git a/pom.xml b/pom.xml index f17bd19..6b314ec 100644 --- a/pom.xml +++ b/pom.xml @@ -132,6 +132,11 @@ trove4j 3.0.2 + + io.netty + netty-all + 4.0.0.Beta1 + diff --git a/src/main/java/net/minecraft/server/DedicatedServer.java b/src/main/java/net/minecraft/server/DedicatedServer.java index bd0377a..68feb71 100644 --- a/src/main/java/net/minecraft/server/DedicatedServer.java +++ b/src/main/java/net/minecraft/server/DedicatedServer.java @@ -93,7 +93,7 @@ public class DedicatedServer extends MinecraftServer implements IMinecraftServer log.info("Starting Minecraft server on " + (this.getServerIp().length() == 0 ? "*" : this.getServerIp()) + ":" + this.G()); try { - this.r = new DedicatedServerConnection(this, inetaddress, this.G()); + this.r = new org.spigotmc.netty.NettyServerConnection(this, inetaddress, this.G()); } catch (Throwable ioexception) { // CraftBukkit - IOException -> Throwable log.warning("**** FAILED TO BIND TO PORT!"); log.log(Level.WARNING, "The exception was: " + ioexception.toString()); diff --git a/src/main/java/net/minecraft/server/Packet51MapChunk.java b/src/main/java/net/minecraft/server/Packet51MapChunk.java index 230dd62..2ba0464 100644 --- a/src/main/java/net/minecraft/server/Packet51MapChunk.java +++ b/src/main/java/net/minecraft/server/Packet51MapChunk.java @@ -42,7 +42,7 @@ public class Packet51MapChunk extends Packet { this.b = chunk.z; this.e = flag; ChunkMap chunkmap = a(chunk, flag, i); - Deflater deflater = new Deflater(-1); + Deflater deflater = new Deflater(4); this.d = chunkmap.c; this.c = chunkmap.b; diff --git a/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java b/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java index 9d5cee7..8486d82 100644 --- a/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java +++ b/src/main/java/net/minecraft/server/Packet56MapChunkBulk.java @@ -24,7 +24,7 @@ public class Packet56MapChunkBulk extends Packet { @Override protected Deflater initialValue() { // Don't use higher compression level, slows things down too much - return new Deflater(6); + return new Deflater(4); } }; // CraftBukkit end diff --git a/src/main/java/net/minecraft/server/PendingConnection.java b/src/main/java/net/minecraft/server/PendingConnection.java index 8413a15..70fe839 100644 --- a/src/main/java/net/minecraft/server/PendingConnection.java +++ b/src/main/java/net/minecraft/server/PendingConnection.java @@ -17,7 +17,7 @@ public class PendingConnection extends Connection { private byte[] d; public static Logger logger = Logger.getLogger("Minecraft"); private static Random random = new Random(); - public NetworkManager networkManager; + public org.spigotmc.netty.NettyNetworkManager networkManager; public boolean c = false; private MinecraftServer server; private int g = 0; @@ -28,10 +28,15 @@ public class PendingConnection extends Connection { private SecretKey l = null; public String hostname = ""; // CraftBukkit - add field + public PendingConnection(MinecraftServer minecraftserver, org.spigotmc.netty.NettyNetworkManager networkManager) { + this.server = minecraftserver; + this.networkManager = networkManager; + } + public PendingConnection(MinecraftServer minecraftserver, Socket socket, String s) throws java.io.IOException { // CraftBukkit - throws IOException this.server = minecraftserver; - this.networkManager = new NetworkManager(socket, s, this, minecraftserver.F().getPrivate()); - this.networkManager.e = 0; + // this.networkManager = new NetworkManager(socket, s, this, minecraftserver.F().getPrivate()); + // this.networkManager.e = 0; } // CraftBukkit start @@ -147,7 +152,7 @@ public class PendingConnection extends Connection { // CraftBukkit org.bukkit.event.server.ServerListPingEvent pingEvent = org.bukkit.craftbukkit.event.CraftEventFactory.callServerListPingEvent(this.server.server, getSocket().getInetAddress(), this.server.getMotd(), playerlist.getPlayerCount(), playerlist.getMaxPlayers()); - if (packet254getinfo.a == 1) { + if (true) { // CraftBukkit start - fix decompile issues, don't create a list from an array Object[] list = new Object[] { 1, 51, this.server.getVersion(), pingEvent.getMotd(), playerlist.getPlayerCount(), pingEvent.getMaxPlayers() }; diff --git a/src/main/java/org/spigotmc/netty/CipherCodec.java b/src/main/java/org/spigotmc/netty/CipherCodec.java new file mode 100644 index 0000000..f25af14 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/CipherCodec.java @@ -0,0 +1,65 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToByteCodec; +import org.bouncycastle.crypto.BufferedBlockCipher; + +/** + * This class is a complete solution for encrypting and decoding bytes in a + * Netty stream. It takes two {@link BufferedBlockCipher} instances, used for + * encryption and decryption respectively. + */ +public class CipherCodec extends ByteToByteCodec { + + private BufferedBlockCipher encrypt; + private BufferedBlockCipher decrypt; + private ByteBuf heapOut; + + public CipherCodec(BufferedBlockCipher encrypt, BufferedBlockCipher decrypt) { + this.encrypt = encrypt; + this.decrypt = decrypt; + } + + @Override + public void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { + if (heapOut == null) { + heapOut = ctx.alloc().heapBuffer(); + } + cipher(encrypt, in, heapOut); + out.writeBytes(heapOut); + heapOut.discardSomeReadBytes(); + } + + @Override + public void decode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { + cipher(decrypt, in, out); + } + + @Override + public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception { + super.freeInboundBuffer(ctx); + decrypt = null; + } + + @Override + public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception { + super.freeOutboundBuffer(ctx); + if (heapOut != null) { + heapOut.release(); + heapOut = null; + } + decrypt = null; + } + + private void cipher(BufferedBlockCipher cipher, ByteBuf in, ByteBuf out) { + int available = in.readableBytes(); + int outputSize = cipher.b(available); // getUpdateOutputSize + if (out.capacity() < outputSize) { + out.capacity(outputSize); + } + int processed = cipher.a(in.array(), in.arrayOffset() + in.readerIndex(), available, out.array(), out.arrayOffset() + out.writerIndex()); // processBytes + in.readerIndex(in.readerIndex() + processed); + out.writerIndex(out.writerIndex() + processed); + } +} diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java new file mode 100644 index 0000000..adfd877 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java @@ -0,0 +1,228 @@ +package org.spigotmc.netty; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.channel.socket.SocketChannel; +import java.net.Socket; +import java.net.SocketAddress; +import java.security.PrivateKey; +import java.util.AbstractList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import javax.crypto.SecretKey; +import net.minecraft.server.Connection; +import net.minecraft.server.INetworkManager; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.Packet; +import net.minecraft.server.Packet252KeyResponse; +import net.minecraft.server.PendingConnection; +import net.minecraft.server.PlayerConnection; +import org.bouncycastle.crypto.BufferedBlockCipher; + +/** + * This class forms the basis of the Netty integration. It implements + * {@link INetworkManager} and handles all events and inbound messages provided + * by the upstream Netty process. + */ +public class NettyNetworkManager extends ChannelInboundMessageHandlerAdapter implements INetworkManager { + + private static final ExecutorService threadPool = Executors.newCachedThreadPool(); + private static final MinecraftServer server = MinecraftServer.getServer(); + private static final PrivateKey key = server.F().getPrivate(); + private static final NettyServerConnection serverConnection = (NettyServerConnection) server.ae(); + /*========================================================================*/ + private final Queue syncPackets = new ConcurrentLinkedQueue(); + private final List highPriorityQueue = new AbstractList() { + @Override + public void add(int index, Packet element) { + // NOP + } + + @Override + public Packet get(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int size() { + return 0; + } + }; + private volatile boolean connected; + private Channel channel; + private SocketAddress address; + private Connection connection; + private SecretKey secret; + private String dcReason; + private Object[] dcArgs; + private Socket socketAdaptor; + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + // Channel and address groundwork first + channel = ctx.channel(); + address = channel.remoteAddress(); + // Then the socket adaptor + socketAdaptor = NettySocketAdaptor.adapt((SocketChannel) channel); + // Followed by their first handler + connection = new PendingConnection(server, this); + // Finally register the connection + connected = true; + serverConnection.pendingConnections.add((PendingConnection) connection); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + a("disconnect.endOfStream", new Object[0]); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + // TODO: Remove this once we are more stable + // Bukkit.getServer().getLogger().severe("======================= Start Netty Debug Log ======================="); + // Bukkit.getServer().getLogger().log(Level.SEVERE, "Error caught whilst handling " + channel, cause); + // Bukkit.getServer().getLogger().severe("======================= End Netty Debug Log ======================="); + // Disconnect with generic reason + exception + a("disconnect.genericReason", new Object[]{"Internal exception: " + cause}); + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, final Packet msg) throws Exception { + if (msg instanceof Packet252KeyResponse) { + secret = ((Packet252KeyResponse) msg).a(key); + } + + if (msg.a_()) { + threadPool.submit(new Runnable() { + public void run() { + Packet packet = PacketListener.callReceived(NettyNetworkManager.this, connection, msg); + if (packet != null) { + packet.handle(connection); + } + } + }); + } else { + syncPackets.add(msg); + } + } + + public Socket getSocket() { + return socketAdaptor; + } + + /** + * setHandler. Set the {@link NetHandler} used to process received packets. + * + * @param nh the new {@link NetHandler} instance + */ + public void a(Connection nh) { + connection = nh; + } + + /** + * queue. Queue a packet for sending, or in this case send it to be write it + * straight to the channel. + * + * @param packet the packet to queue + */ + public void queue(Packet packet) { + // Only send if channel is still connected + if (connected) { + // Process packet via handler + packet = PacketListener.callQueued(this, connection, packet); + // If handler indicates packet send + if (packet != null) { + highPriorityQueue.add(packet); + channel.write(packet); + + // If needed, check and prepare encryption phase + if (packet instanceof Packet252KeyResponse) { + BufferedBlockCipher encrypt = NettyServerConnection.getCipher(true, secret); + BufferedBlockCipher decrypt = NettyServerConnection.getCipher(false, secret); + CipherCodec codec = new CipherCodec(encrypt, decrypt); + channel.pipeline().addBefore("decoder", "cipher", codec); + } + } + } + } + + /** + * wakeThreads. In Vanilla this method will interrupt the network read and + * write threads, thus waking them. + */ + public void a() { + } + + /** + * processPackets. Remove up to 1000 packets from the queue and process + * them. This method should only be called from the main server thread. + */ + public void b() { + for (int i = 1000; !syncPackets.isEmpty() && i >= 0; i--) { + if (connection instanceof PendingConnection ? ((PendingConnection) connection).c : ((PlayerConnection) connection).disconnected) { + syncPackets.clear(); + break; + } + + Packet packet = PacketListener.callReceived(this, connection, syncPackets.poll()); + if (packet != null) { + packet.handle(connection); + } + } + + // Disconnect via the handler - this performs all plugin related cleanup + logging + if (!connected && (dcReason != null || dcArgs != null)) { + connection.a(dcReason, dcArgs); + } + } + + /** + * getSocketAddress. Return the remote address of the connected user. It is + * important that this method returns a value even after disconnect. + * + * @return the remote address of this connection + */ + public SocketAddress getSocketAddress() { + return address; + } + + /** + * close. Close and release all resources associated with this connection. + */ + public void d() { + if (connected) { + connected = false; + channel.close(); + } + } + + /** + * queueSize. Return the number of packets in the low priority queue. In a + * NIO environment this will always be 0. + * + * @return the size of the packet send queue + */ + public int e() { + return 0; + } + + /** + * networkShutdown. Shuts down this connection, storing the reason and + * parameters, used to notify the current {@link Connection}. + * + * @param reason the main disconnect reason + * @param arguments additional disconnect arguments, for example, the + * exception which triggered the disconnect. + */ + public void a(String reason, Object... arguments) { + if (connected) { + dcReason = reason; + dcArgs = arguments; + d(); + } + } +} diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java new file mode 100644 index 0000000..4f08e23 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java @@ -0,0 +1,105 @@ +package org.spigotmc.netty; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.timeout.ReadTimeoutHandler; +import java.net.InetAddress; +import java.security.Key; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.logging.Level; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.PendingConnection; +import net.minecraft.server.ServerConnection; +import org.bouncycastle.crypto.BufferedBlockCipher; +import org.bouncycastle.crypto.engines.AESFastEngine; +import org.bouncycastle.crypto.modes.CFBBlockCipher; +import org.bouncycastle.crypto.params.KeyParameter; +import org.bouncycastle.crypto.params.ParametersWithIV; +import org.bukkit.Bukkit; + +/** + * This is the NettyServerConnection class. It implements + * {@link ServerConnection} and is the main interface between the Minecraft + * server and this NIO implementation. It handles starting, stopping and + * processing the Netty backend. + */ +public class NettyServerConnection extends ServerConnection { + + private final ChannelFuture socket; + final List pendingConnections = Collections.synchronizedList(new ArrayList()); + + public NettyServerConnection(MinecraftServer ms, InetAddress host, int port) { + super(ms); + socket = new ServerBootstrap().channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() { + @Override + public void initChannel(Channel ch) throws Exception { + try { + ch.config().setOption(ChannelOption.IP_TOS, 0x18); + } catch (ChannelException ex) { + // IP_TOS is not supported (Windows XP / Windows Server 2003) + } + + ch.pipeline() + .addLast("timer", new ReadTimeoutHandler(30)) + .addLast("decoder", new PacketDecoder()) + .addLast("encoder", new PacketEncoder()) + .addLast("manager", new NettyNetworkManager()); + } + }).group(new NioEventLoopGroup(3)).localAddress(host, port).bind(); + } + + /** + * Pulse. This method pulses all connections causing them to update. It is + * called from the main server thread a few times a tick. + */ + @Override + public void b() { + super.b(); // pulse PlayerConnections + for (int i = 0; i < pendingConnections.size(); ++i) { + PendingConnection connection = pendingConnections.get(i); + + try { + connection.c(); + } catch (Exception ex) { + connection.disconnect("Internal server error"); + Bukkit.getServer().getLogger().log(Level.WARNING, "Failed to handle packet: " + ex, ex); + } + + if (connection.c) { + pendingConnections.remove(i--); + } + } + } + + /** + * Shutdown. This method is called when the server is shutting down and the + * server socket and all clients should be terminated with no further + * action. + */ + @Override + public void a() { + socket.channel().close().syncUninterruptibly(); + } + + /** + * Return a Minecraft compatible cipher instance from the specified key. + * + * @param forEncryption whether the returned cipher shall be usable for + * encryption or decryption + * @param key to use as the initial vector + * @return the initialized cipher + */ + public static BufferedBlockCipher getCipher(boolean forEncryption, Key key) { + BufferedBlockCipher cip = new BufferedBlockCipher(new CFBBlockCipher(new AESFastEngine(), 8)); + cip.a(forEncryption, new ParametersWithIV(new KeyParameter(key.getEncoded()), key.getEncoded(), 0, 16)); + return cip; + } +} diff --git a/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java new file mode 100644 index 0000000..a3b86b8 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java @@ -0,0 +1,248 @@ +package org.spigotmc.netty; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.channels.SocketChannel; + +/** + * This class wraps a Netty {@link Channel} in a {@link Socket}. It overrides + * all methods in {@link Socket} to ensure that calls are not mistakingly made + * to the unsupported super socket. All operations that can be sanely applied to + * a {@link Channel} are implemented here. Those which cannot will throw an + * {@link UnsupportedOperationException}. + */ +public class NettySocketAdaptor extends Socket { + + private final io.netty.channel.socket.SocketChannel ch; + + private NettySocketAdaptor(io.netty.channel.socket.SocketChannel ch) { + this.ch = ch; + } + + public static NettySocketAdaptor adapt(io.netty.channel.socket.SocketChannel ch) { + return new NettySocketAdaptor(ch); + } + + @Override + public void bind(SocketAddress bindpoint) throws IOException { + ch.bind(bindpoint).syncUninterruptibly(); + } + + @Override + public synchronized void close() throws IOException { + ch.close().syncUninterruptibly(); + } + + @Override + public void connect(SocketAddress endpoint) throws IOException { + ch.connect(endpoint).syncUninterruptibly(); + } + + @Override + public void connect(SocketAddress endpoint, int timeout) throws IOException { + ch.config().setConnectTimeoutMillis(timeout); + ch.connect(endpoint).syncUninterruptibly(); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof NettySocketAdaptor && ch.equals(((NettySocketAdaptor) obj).ch); + } + + @Override + public SocketChannel getChannel() { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public InetAddress getInetAddress() { + return ch.remoteAddress().getAddress(); + } + + @Override + public InputStream getInputStream() throws IOException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public boolean getKeepAlive() throws SocketException { + return ch.config().getOption(ChannelOption.SO_KEEPALIVE); + } + + @Override + public InetAddress getLocalAddress() { + return ch.localAddress().getAddress(); + } + + @Override + public int getLocalPort() { + return ch.localAddress().getPort(); + } + + @Override + public SocketAddress getLocalSocketAddress() { + return ch.localAddress(); + } + + @Override + public boolean getOOBInline() throws SocketException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public OutputStream getOutputStream() throws IOException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public int getPort() { + return ch.remoteAddress().getPort(); + } + + @Override + public synchronized int getReceiveBufferSize() throws SocketException { + return ch.config().getOption(ChannelOption.SO_RCVBUF); + } + + @Override + public SocketAddress getRemoteSocketAddress() { + return ch.remoteAddress(); + } + + @Override + public boolean getReuseAddress() throws SocketException { + return ch.config().getOption(ChannelOption.SO_REUSEADDR); + } + + @Override + public synchronized int getSendBufferSize() throws SocketException { + return ch.config().getOption(ChannelOption.SO_SNDBUF); + } + + @Override + public int getSoLinger() throws SocketException { + return ch.config().getOption(ChannelOption.SO_LINGER); + } + + @Override + public synchronized int getSoTimeout() throws SocketException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public boolean getTcpNoDelay() throws SocketException { + return ch.config().getOption(ChannelOption.TCP_NODELAY); + } + + @Override + public int getTrafficClass() throws SocketException { + return ch.config().getOption(ChannelOption.IP_TOS); + } + + @Override + public int hashCode() { + return ch.hashCode(); + } + + @Override + public boolean isBound() { + return ch.localAddress() != null; + } + + @Override + public boolean isClosed() { + return !ch.isOpen(); + } + + @Override + public boolean isConnected() { + return ch.isActive(); + } + + @Override + public boolean isInputShutdown() { + return ch.isInputShutdown(); + } + + @Override + public boolean isOutputShutdown() { + return ch.isOutputShutdown(); + } + + @Override + public void sendUrgentData(int data) throws IOException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public void setKeepAlive(boolean on) throws SocketException { + ch.config().setOption(ChannelOption.SO_KEEPALIVE, on); + } + + @Override + public void setOOBInline(boolean on) throws SocketException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public synchronized void setReceiveBufferSize(int size) throws SocketException { + ch.config().setOption(ChannelOption.SO_RCVBUF, size); + } + + @Override + public void setReuseAddress(boolean on) throws SocketException { + ch.config().setOption(ChannelOption.SO_REUSEADDR, on); + } + + @Override + public synchronized void setSendBufferSize(int size) throws SocketException { + ch.config().setOption(ChannelOption.SO_SNDBUF, size); + } + + @Override + public void setSoLinger(boolean on, int linger) throws SocketException { + ch.config().setOption(ChannelOption.SO_LINGER, linger); + } + + @Override + public synchronized void setSoTimeout(int timeout) throws SocketException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public void setTcpNoDelay(boolean on) throws SocketException { + ch.config().setOption(ChannelOption.TCP_NODELAY, on); + } + + @Override + public void setTrafficClass(int tc) throws SocketException { + ch.config().setOption(ChannelOption.IP_TOS, tc); + } + + @Override + public void shutdownInput() throws IOException { + throw new UnsupportedOperationException("Operation not supported on Channel wrapper."); + } + + @Override + public void shutdownOutput() throws IOException { + ch.shutdownOutput().syncUninterruptibly(); + } + + @Override + public String toString() { + return ch.toString(); + } +} diff --git a/src/main/java/org/spigotmc/netty/PacketDecoder.java b/src/main/java/org/spigotmc/netty/PacketDecoder.java new file mode 100644 index 0000000..6ecbca7 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/PacketDecoder.java @@ -0,0 +1,63 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ReplayingDecoder; +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import net.minecraft.server.Packet; + +/** + * Packet decoding class backed by a reusable {@link DataInputStream} which + * backs the input {@link ByteBuf}. Reads an unsigned byte packet header and + * then decodes the packet accordingly. + */ +public class PacketDecoder extends ReplayingDecoder { + + private DataInputStream input; + private Packet packet; + + public PacketDecoder() { + super(ReadState.HEADER); + } + + @Override + public Packet decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + if (input == null) { + input = new DataInputStream(new ByteBufInputStream(in)); + } + + switch (state()) { + case HEADER: + short packetId = in.readUnsignedByte(); + packet = Packet.d(packetId); + if (packet == null) { + throw new IOException("Bad packet id " + packetId); + } + checkpoint(ReadState.DATA); + case DATA: + try { + packet.a(input); + } catch (EOFException ex) { + return null; + } + + checkpoint(ReadState.HEADER); + Packet ret = packet; + packet = null; + + return ret; + default: + throw new IllegalStateException(); + } + } + + @Override + public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception { + super.freeInboundBuffer(ctx); + input = null; + packet = null; + } +} diff --git a/src/main/java/org/spigotmc/netty/PacketEncoder.java b/src/main/java/org/spigotmc/netty/PacketEncoder.java new file mode 100644 index 0000000..9d0b06c --- /dev/null +++ b/src/main/java/org/spigotmc/netty/PacketEncoder.java @@ -0,0 +1,43 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import java.io.DataOutputStream; +import net.minecraft.server.Packet; + +/** + * Netty encoder which takes a packet and encodes it, and adds a byte packet id + * header. + */ +public class PacketEncoder extends MessageToByteEncoder { + + private ByteBuf outBuf; + private DataOutputStream dataOut; + + @Override + public void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception { + if (outBuf == null) { + outBuf = ctx.alloc().directBuffer(); + } + if (dataOut == null) { + dataOut = new DataOutputStream(new ByteBufOutputStream(outBuf)); + } + + out.writeByte(msg.k()); + msg.a(dataOut); + out.writeBytes(outBuf); + out.discardSomeReadBytes(); + } + + @Override + public void freeOutboundBuffer(ChannelHandlerContext ctx) throws Exception { + super.freeOutboundBuffer(ctx); + if (outBuf != null) { + outBuf.release(); + outBuf = null; + } + dataOut = null; + } +} diff --git a/src/main/java/org/spigotmc/netty/PacketListener.java b/src/main/java/org/spigotmc/netty/PacketListener.java new file mode 100644 index 0000000..8e3b932 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/PacketListener.java @@ -0,0 +1,100 @@ +package org.spigotmc.netty; + +import com.google.common.base.Preconditions; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import net.minecraft.server.Connection; +import net.minecraft.server.INetworkManager; +import net.minecraft.server.Packet; +import org.bukkit.Bukkit; +import org.bukkit.plugin.Plugin; + +/** + * This class is used for plugins that wish to register to listen to incoming + * and outgoing packets. To use this class, simply create a new instance, + * override the methods you wish to use, and call + * {@link #register(org.spigotmc.netty.PacketListener, org.bukkit.plugin.Plugin)}. + */ +public class PacketListener { + + /** + * A mapping of all registered listeners and their owning plugins. + */ + private static final Map listeners = new HashMap(); + /** + * A baked list of all listeners, for efficiency sake. + */ + private static PacketListener[] baked = new PacketListener[0]; + + /** + * Used to register a handler for receiving notifications of packet + * activity. + * + * @param listener the listener to register + * @param plugin the plugin owning this listener + */ + public static synchronized void register(PacketListener listener, Plugin plugin) { + Preconditions.checkNotNull(listener, "listener"); + Preconditions.checkNotNull(plugin, "plugin"); + Preconditions.checkState(!listeners.containsKey(listener), "listener already registered"); + + int size = listeners.size(); + Preconditions.checkState(baked.length == size); + listeners.put(listener, plugin); + baked = Arrays.copyOf(baked, size + 1); + baked[size] = listener; + } + + static Packet callReceived(INetworkManager networkManager, Connection connection, Packet packet) { + for (PacketListener listener : baked) { + try { + packet = listener.packetReceived(networkManager, connection, packet); + } catch (Throwable t) { + Bukkit.getServer().getLogger().log(Level.SEVERE, "Error whilst firing receive hook for packet", t); + } + } + return packet; + } + + static Packet callQueued(INetworkManager networkManager, Connection connection, Packet packet) { + for (PacketListener listener : baked) { + try { + packet = listener.packetQueued(networkManager, connection, packet); + } catch (Throwable t) { + Bukkit.getServer().getLogger().log(Level.SEVERE, "Error whilst firing queued hook for packet", t); + } + } + return packet; + } + + /** + * Called when a packet has been received and is about to be handled by the + * current {@link Connection}. The returned packet will be the packet passed + * on for handling, or in the case of null being returned, not handled at + * all. + * + * @param networkManager the NetworkManager receiving the packet + * @param connection the connection which will handle the packet + * @param packet the received packet + * @return the packet to be handled, or null to cancel + */ + public Packet packetReceived(INetworkManager networkManager, Connection connection, Packet packet) { + return packet; + } + + /** + * Called when a packet is queued to be sent. The returned packet will be + * the packet sent. In the case of null being returned, the packet will not + * be sent. + * + * @param networkManager the NetworkManager which will send the packet + * @param connection the connection which queued the packet + * @param packet the queue packet + * @return the packet to be sent, or null if the packet will not be sent. + */ + public Packet packetQueued(INetworkManager networkManager, Connection connection, Packet packet) { + return packet; + } +} diff --git a/src/main/java/org/spigotmc/netty/ReadState.java b/src/main/java/org/spigotmc/netty/ReadState.java new file mode 100644 index 0000000..5dc3754 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/ReadState.java @@ -0,0 +1,16 @@ +package org.spigotmc.netty; + +/** + * Stores the state of the packet currently being read. + */ +public enum ReadState { + + /** + * Indicates the byte representing the ID has been read. + */ + HEADER, + /** + * Shows the packet body is being read. + */ + DATA; +} -- 1.8.1-rc2