Netty 实战
NIO编程
关于 NIO 相关的文章网上也有很多,这里不打算详细深入分析,下面简单描述一下 NIO 是如何解决以上三个问题的。
线程资源受限
NIO 编程模型中,新来一个连接不再创建一个新的线程,而是可以把这条连接直接绑定到某个固定的线程,然后这条连接所有的读写都由这个线程来负责,那么他是怎么做到的?我们用一幅图来对比一下 IO 与 NIO
如上图所示,IO 模型中,一个连接来了,会创建一个线程,对应一个 while 死循环,死循环的目的就是不断监测这条连接上是否有数据可以读,大多数情况下,1w 个连接里面同一时刻只有少量的连接有数据可读,因此,很多个 while 死循环都白白浪费掉了,因为读不出啥数据。
而在 NIO 模型中,他把这么多 while 死循环变成一个死循环,这个死循环由一个线程控制,那么他又是如何做到一个线程,一个 while 死循环就能监测1w个连接是否有数据可读的呢? 这就是 NIO 模型中 selector 的作用,一条连接来了之后,现在不创建一个 while 死循环去监听是否有数据可读了,而是直接把这条连接注册到 selector 上,然后,通过检查这个 selector,就可以批量监测出有数据可读的连接,进而读取数据,下面我再举个非常简单的生活中的例子说明 IO 与 NIO 的区别。
在一家幼儿园里,小朋友有上厕所的需求,小朋友都太小以至于你要问他要不要上厕所,他才会告诉你。幼儿园一共有 100 个小朋友,有两种方案可以解决小朋友上厕所的问题:
- 1、每个小朋友配一个老师。每个老师隔段时间询问小朋友是否要上厕所,如果要上,就领他去厕所,100 个小朋友就需要 100 个老师来询问,并且每个小朋友上厕所的时候都需要一个老师领着他去上,这就是IO模型,一个连接对应一个线程。
- 2、所有的小朋友都配同一个老师。这个老师隔段时间询问所有的小朋友是否有人要上厕所,然后每一时刻把所有要上厕所的小朋友批量领到厕所,这就是 NIO 模型,所有小朋友都注册到同一个老师,对应的就是所有的连接都注册到一个线程,然后批量轮询。
这就是 NIO 模型解决线程资源受限的方案,实际开发过程中,我们会开多个线程,每个线程都管理着一批连接,相对于 IO 模型中一个线程管理一条连接,消耗的线程资源大幅减少
线程切换效率低下
由于 NIO 模型中线程数量大大降低,线程切换效率因此也大幅度提高
IO读写面向流
IO 读写是面向流的,一次性只能从流中读取一个或者多个字节,并且读完之后流无法再读取,你需要自己缓存数据。 而 NIO 的读写是面向 Buffer 的,你可以随意读取里面任何一个字节数据,不需要你自己缓存数据,这一切只需要移动读写指针即可。
更多详细请看下面的文章:
Netty入门与实战:仿写微信IM即时通讯系统
一、netty开发的基本套路
Netty开发的基本套路很简洁,服务器端和客户端都是这样。
大致的套路基本如下:
Netty开发的实际代码过程,也确实并不复杂,就像下图这样,绿色的代表客户端流程、蓝色的代表服务器端流程,注意标红的部分。
实际代码过程就像下图这样:
二、 创建客户端类
pom.xml 引入依赖:
<!-- netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.48.Final</version>
<scope>compile</scope>
</dependency>
1、创建Handler:
首先创建Handler类,该类用于接收服务器端发送的数据,这是一个简化的类,只重写了消息读取方法channelRead0、捕捉异常方法exceptionCaught。
客户端的Handler一般继承的是 SimpleChannelInboundHandler,该类有丰富的方法,心跳、超时检测、连接状态等等。
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* @Date: 2020/6/1 11:12
* @Description: 通用handler,处理I/O事件
*/
@ChannelHandler.Sharable
public class HandlerClientHello extends SimpleChannelInboundHandler<ByteBuf>
{
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception
{
/**
* @Description 处理接收到的消息
**/
System.out.println("接收到的消息:"+byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException
{
/**
* @Description 处理I/O事件的异常
**/
cause.printStackTrace();
ctx.close();
}
}
代码说明:
1)@ChannelHandler.Sharable
:这个注解是为了线程安全,如果你不在乎是否线程安全,不加也可以;
2)SimpleChannelInboundHandler
:这里的类型可以是ByteBuf,也可以是String,还可以是对象,根据实际情况来;
3)channelRead0
:消息读取方法,注意名称中有个0;
4)ChannelHandlerContext
:通道上下文,代指Channel;
5)ByteBuf
:字节序列,通过ByteBuf操作基础的字节数组和缓冲区,因为JDK原生操作字节麻烦、效率低,所以Netty对字节的操作进行了封装,实现了指数级的性能提升,同时使用更加便利;
6)CharsetUtil.UTF_8
:这个是JDK原生的方法,用于指定字节数组转换为字符串时的编码格式。
2、创建客户端启动类:
客户端启动类根据服务器端的IP和端口,建立连接,连接建立后,实现消息的双向传输。
package ai.tradingquant.netttydemo;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
/**
* 客户端启动类
*/
public class AppClientHello {
private final String host;
private final int port;
public AppClientHello(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();// I/O 线程池
try{
Bootstrap bootstrap = new Bootstrap(); // 客户端辅助启动类
bootstrap.group(group).channel(NioSocketChannel.class) // 实例化一个channel
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() { // 进行通道初始化配置
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception{
socketChannel.pipeline().addLast(new HandlerClientHello()); // 添加自定义的Handler
}
});
// 连接到远程节点,等待连接完成
ChannelFuture future = bootstrap.connect().sync();
// 发送消息到服务器端,编码格式是utf-8
future.channel().writeAndFlush(Unpooled.copiedBuffer("Hello world, SIJIALI", CharsetUtil.UTF_8));
// 阻塞操作,closeFuture() 开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
new AppClientHello("127.0.0.1", 18080).run();
}
}
由于代码中已经添加了详尽的注释,这里只对极个别的进行说明:
1)ChannelInitializer:通道Channel的初始化工作,如加入多个handler,都在这里进行;
2)bootstrap.connect().sync():这里的sync()表示采用的同步方法,这样连接建立成功后,才继续往下执行;
3)pipeline():连接建立后,都会自动创建一个管道pipeline,这个管道也被称为责任链,保证顺序执行,同时又可以灵活的配置各类Handler,这是一个很精妙的设计,既减少了线程切换带来的资源开销、避免好多麻烦事,同时性能又得到了极大增强。
三、 创建服务端类
1、创建Handler:
和客户端一样,只重写了消息读取方法channelRead(注意这里不是channelRead0)、捕捉异常方法exceptionCaught。
另外服务器端Handler继承的是 ChannelInboundHandlerAdapter
,而不是SimpleChannelInboundHandler
,至于这两者的区别,这里不赘述,大家自行百度吧。
import io.netty.*;
/**
* @Date: 2020/6/1 11:47
* @Description: 服务器端I/O处理类
*/
@ChannelHandler.Sharable
public class HandlerServerHello extends ChannelInboundHandlerAdapter
{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
//处理收到的数据,并反馈消息到到客户端
ByteBuf in = (ByteBuf) msg;
System.out.println("收到客户端发过来的消息: "+ in.toString(CharsetUtil.UTF_8));
//写入并发送信息到远端(客户端)
ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是服务端,我已经收到你发送的消息", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
//出现异常的时候执行的动作(打印并关闭通道)
cause.printStackTrace();
ctx.close();
}
}
以上代码很简洁,大家注意和客户端Handler类进行比较。
2、创建服务器端启动类:
服务器端启动类比客户端启动类稍显复杂一点,先贴出代码如下:
import io.netty.*;
import java.net.InetSocketAddress;
/**
* @Date: 2020/6/1 11:51
* @Description: 服务器端启动类
*/
public class AppServerHello
{
private int port;
public AppServerHello(int port)
{
this.port = port;
}
public void run() throws Exception
{
EventLoopGroup group = newNioEventLoopGroup();//Netty的Reactor线程池,初始化了一个NioEventLoop数组,用来处理I/O操作,如接受新的连接和读/写数据
try{
ServerBootstrap b = newServerBootstrap();//用于启动NIO服务
b.group(group)
.channel(NioServerSocketChannel.class) //通过工厂方法设计模式实例化一个channel
.localAddress(newInetSocketAddress(port))//设置监听端口
.childHandler(newChannelInitializer<SocketChannel>() {
//ChannelInitializer是一个特殊的处理类,他的目的是帮助使用者配置一个新的Channel,用于把许多自定义的处理类增加到pipline上来
@Override
public void initChannel(SocketChannel ch) throws Exception {//ChannelInitializer 是一个特殊的处理类,他的目的是帮助使用者配置一个新的 Channel。
ch.pipeline().addLast(new HandlerServerHello());//配置childHandler来通知一个关于消息处理的InfoServerHandler实例
}
});
//绑定服务器,该实例将提供有关IO操作的结果或状态的信息
ChannelFuture channelFuture= b.bind().sync();
System.out.println("在"+ channelFuture.channel().localAddress()+"上开启监听");
//阻塞操作,closeFuture()开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开
channelFuture.channel().closeFuture().sync();
} finally{
group.shutdownGracefully().sync();//关闭EventLoopGroup并释放所有资源,包括所有创建的线程
}
}
public static void main(String[] args) throws Exception
{
new AppServerHello(18080).run();
}
}
代码说明:
1)EventLoopGroup:实际项目中,这里创建两个EventLoopGroup的实例,一个负责接收客户端的连接,另一个负责处理消息I/O,这里为了简单展示流程,让一个实例把这两方面的活都干了;
2)NioServerSocketChannel:通过工厂通过工厂方法设计模式实例化一个channel,这个在大家还没有能够熟练使用Netty进行项目开发的情况下,不用去深究。
到这里,我们就把服务器端和客户端都写完了 ,如何运行呢,先在服务器端启动类上右键,点Run 'AppServerHello.main()'菜单运行,见下图。
四、启动测试
启动服务端:
启动客户端:
相关文章:
史上最通俗Netty入门长文:基本介绍、环境搭建、动手实战
Netty入门与实战:仿写微信IM即时通讯系统
为者常成,行者常至
自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)