Netty介绍与认识
-
概述
Netty是由JBOSS提供的一个java开源框架。
Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
2.体系结构图
Netty的核心结构
Netty是典型的Reactor模型结构,在实现上,Netty中的Boss类充当mainReactor,NioWorker类充当subReactor(默认NioWorker的个数是当前服务器的可用核数)。
在处理新来的请求时,NioWorker读完已收到的数据到ChannelBuffer中,之后触发ChannelPipeline中的ChannelHandler流。
Netty是事件驱动的,可以通过ChannelHandler链来控制执行流向。因为ChannelHandler链的执行过程是在subReactor中同步的,所以如果业务处理handler耗时长,将严重影响可支持的并发数。
发一下Netty的架构
Server1
public static void main(String[] args) throws Exception {
//启动server服务
new NettyServer().bind(8089);
}
NettyServer
public void bind(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); //bossGroup就是parentGroup,是负责处理TCP/IP连接的
EventLoopGroup workerGroup = new NioEventLoopGroup(); //workerGroup就是childGroup,是负责处理Channel(通道)的I/O事件
ServerBootstrap sb = new ServerBootstrap();
sb.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128) //初始化服务端可连接队列,指定了队列的大小128
.childOption(ChannelOption.SO_KEEPALIVE, true) //保持长连接
.childHandler(new ChannelInitializer() { // 绑定客户端连接时候触发操作
@Override
protected void initChannel(SocketChannel sh) throws Exception {
// sh.pipeline().addLast(new StringEncoder(Charset.forName("gbk")));
// sh.pipeline().addLast(new StringDecoder(Charset.forName("gbk")));
sh.pipeline().addLast("decoder", new MyDecode());
sh.pipeline() .addLast(new ServerHandler()); //使用ServerHandler类来处理接收到的消息
}
});
//绑定监听端口,调用sync同步阻塞方法等待绑定操作完
ChannelFuture future = sb.bind(port).sync();
if (future.isSuccess()) {
System.out.println("服务端启动成功");
} else {
System.out.println("服务端启动失败");
future.cause().printStackTrace();
bossGroup.shutdownGracefully(); //关闭线程组
workerGroup.shutdownGracefully();
}
//成功绑定到端口之后,给channel增加一个 管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程。
future.channel().closeFuture().sync();
}
MyDecode 是我自己编写的一个解析16进制的类,编码器
public class MyDecode extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List
ServerHandler
代码就上了,这样16进制和读不到的问题就解决了。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
try {
System.out.println(msg.toString);
} catch (Exception e) {
e.printStackTrace();
System.err.println(e.getMessage());
}
}
// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// RpcRequest request = (RpcRequest) msg;
// System.out.println("接收到客户端信息:" + request.toString());
// //返回的数据结构
// RpcResponse response = new RpcResponse();
// response.setId(UUID.randomUUID().toString());
// response.setData("server响应结果");
// response.setStatus(1);
// ctx.writeAndFlush(response);
// }
//通知处理器最后的channelRead()是当前批处理中的最后一条消息时调用
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
//读操作时捕获到异常时调用
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
//客户端去和服务端连接成功时触发
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
try {
int strLen = 34;
StringBuffer sb = null;
while (classpath.length() < strLen) {
sb = new StringBuffer();
//sb.append("0").append(_16);// 左补0
sb.append(classpath).append("0");//右补0
classpath = sb.toString();
}
System.err.println("sb=" + sb);
String str = "ADBA000160002546323160ACC70014CB142701" + classpath + "FE";
String byte16 = "ADBA000120000B020195FE";
ByteBuf bytebuf = Unpooled.buffer(16);
ByteBuf bytebuf1 = Unpooled.buffer(16);
byte[] bytes = MyDecode.hexString2Bytes(byte16);
byte[] bytes1 = MyDecode.hexString2Bytes(str);
bytebuf1.writeBytes(bytes1);
bytebuf.writeBytes(bytes);
ctx.writeAndFlush(bytebuf);
for (int i = 0; i < 2; i++) {
if (i == 1) {
ctx.writeAndFlush(bytebuf + "\n");
} else {
//ctx.writeAndFlush(bytebuf1 + "\n");
}
}
ctx.flush();
} catch (Exception e) {
e.printStackTrace();
System.out.println(e.getMessage());
}
}