概述
Netty是由JBOSS提供的一个java开源框架。 Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
2.体系结构图
Netty的核心结构
Netty是典型的Reactor模型结构,在实现上,Netty中的Boss类充当mainReactor,NioWorker类充当subReactor(默认NioWorker的个数是当前服务器的可用核数)。
在处理新来的请求时,NioWorker读完已收到的数据到ChannelBuffer中,之后触发ChannelPipeline中的ChannelHandler流。
Netty是事件驱动的,可以通过ChannelHandler链来控制执行流向。因为ChannelHandler链的执行过程是在subReactor中同步的,所以如果业务处理handler耗时长,将严重影响可支持的并发数。
发一下Netty的架构
Server1
public static void main(String[] args) throws Exception { //启动server服务 new NettyServer().bind(8089); }
NettyServer
public void bind(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); //bossGroup就是parentGroup,是负责处理TCP/IP连接的 EventLoopGroup workerGroup = new NioEventLoopGroup(); //workerGroup就是childGroup,是负责处理Channel(通道)的I/O事件 ServerBootstrap sb = new ServerBootstrap(); sb.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) //初始化服务端可连接队列,指定了队列的大小128 .childOption(ChannelOption.SO_KEEPALIVE, true) //保持长连接 .childHandler(new ChannelInitializer() { // 绑定客户端连接时候触发操作 @Override protected void initChannel(SocketChannel sh) throws Exception { // sh.pipeline().addLast(new StringEncoder(Charset.forName("gbk"))); // sh.pipeline().addLast(new StringDecoder(Charset.forName("gbk"))); sh.pipeline().addLast("decoder", new MyDecode()); sh.pipeline() .addLast(new ServerHandler()); //使用ServerHandler类来处理接收到的消息 } }); //绑定监听端口,调用sync同步阻塞方法等待绑定操作完 ChannelFuture future = sb.bind(port).sync(); if (future.isSuccess()) { System.out.println("服务端启动成功"); } else { System.out.println("服务端启动失败"); future.cause().printStackTrace(); bossGroup.shutdownGracefully(); //关闭线程组 workerGroup.shutdownGracefully(); } //成功绑定到端口之后,给channel增加一个 管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程。 future.channel().closeFuture().sync(); }
MyDecode 是我自己编写的一个解析16进制的类,编码器
public class MyDecode extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { byte[] b = new byte[byteBuf.readableBytes()]; //复制内容到字节数组b byteBuf.readBytes(b); //字节数组转字符串 String str = new String(b); list.add(bytesToHexString(b)); } public String bytesToHexString(byte[] bArray) { StringBuffer sb = new StringBuffer(bArray.length); String sTemp; for (int i = 0; i < bArray.length; i++) { sTemp = Integer.toHexString(0xFF & bArray[i]); if (sTemp.length() < 2) sb.append(0); sb.append(sTemp.toUpperCase()); } return sb.toString(); } public static String toHexString1(byte[] b) { StringBuffer buffer = new StringBuffer(); for (int i = 0; i < b.length; ++i) { buffer.append(toHexString1(b[i])); } return buffer.toString(); } public static String toHexString1(byte b) { String s = Integer.toHexString(b & 0xFF); if (s.length() == 1) { return "0" + s; } else { return s; } } public static byte[] hexString2Bytes(String hex) { if ((hex == null) || (hex.equals(""))){ return null; }else if (hex.length()%2 != 0){ return null; }else{ hex = hex.toUpperCase(); int len = hex.length()/2; byte[] b = new byte[len]; char[] hc = hex.toCharArray(); for (int i=0; i int p=2*i; b[i] = (byte) (charToByte(hc[p]) << 4 | charToByte(hc[p+1])); } return b; } } private static byte charToByte(char c) { return (byte) "0123456789ABCDEF".indexOf(c); } public static void main(String[] args) throws Exception { String fan="烦"; String str = URLEncoder.encode(fan, "utf-8").replaceAll("%", ""); System.err.println(str); } }
ServerHandler
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException { try { System.out.println(msg.toString); } catch (Exception e) { e.printStackTrace(); System.err.println(e.getMessage()); } } // @Override // public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // RpcRequest request = (RpcRequest) msg; // System.out.println("接收到客户端信息:" + request.toString()); // //返回的数据结构 // RpcResponse response = new RpcResponse(); // response.setId(UUID.randomUUID().toString()); // response.setData("server响应结果"); // response.setStatus(1); // ctx.writeAndFlush(response); // } //通知处理器最后的channelRead()是当前批处理中的最后一条消息时调用 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } //读操作时捕获到异常时调用 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } //客户端去和服务端连接成功时触发 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { try { int strLen = 34; StringBuffer sb = null; while (classpath.length() < strLen) { sb = new StringBuffer(); //sb.append("0").append(_16);// 左补0 sb.append(classpath).append("0");//右补0 classpath = sb.toString(); } System.err.println("sb=" + sb); String str = "ADBA000160002546323160ACC70014CB142701" + classpath + "FE"; String byte16 = "ADBA000120000B020195FE"; ByteBuf bytebuf = Unpooled.buffer(16); ByteBuf bytebuf1 = Unpooled.buffer(16); byte[] bytes = MyDecode.hexString2Bytes(byte16); byte[] bytes1 = MyDecode.hexString2Bytes(str); bytebuf1.writeBytes(bytes1); bytebuf.writeBytes(bytes); ctx.writeAndFlush(bytebuf); for (int i = 0; i < 2; i++) { if (i == 1) { ctx.writeAndFlush(bytebuf + "\n"); } else { //ctx.writeAndFlush(bytebuf1 + "\n"); } } ctx.flush(); } catch (Exception e) { e.printStackTrace(); System.out.println(e.getMessage()); } }
代码就上了,这样16进制和读不到的问题就解决了。