Netty 实战

NIO编程

关于 NIO 相关的文章网上也有很多,这里不打算详细深入分析,下面简单描述一下 NIO 是如何解决以上三个问题的。

线程资源受限

NIO 编程模型中,新来一个连接不再创建一个新的线程,而是可以把这条连接直接绑定到某个固定的线程,然后这条连接所有的读写都由这个线程来负责,那么他是怎么做到的?我们用一幅图来对比一下 IO 与 NIO
file

如上图所示,IO 模型中,一个连接来了,会创建一个线程,对应一个 while 死循环,死循环的目的就是不断监测这条连接上是否有数据可以读,大多数情况下,1w 个连接里面同一时刻只有少量的连接有数据可读,因此,很多个 while 死循环都白白浪费掉了,因为读不出啥数据。

而在 NIO 模型中,他把这么多 while 死循环变成一个死循环,这个死循环由一个线程控制,那么他又是如何做到一个线程,一个 while 死循环就能监测1w个连接是否有数据可读的呢? 这就是 NIO 模型中 selector 的作用,一条连接来了之后,现在不创建一个 while 死循环去监听是否有数据可读了,而是直接把这条连接注册到 selector 上,然后,通过检查这个 selector,就可以批量监测出有数据可读的连接,进而读取数据,下面我再举个非常简单的生活中的例子说明 IO 与 NIO 的区别。

在一家幼儿园里,小朋友有上厕所的需求,小朋友都太小以至于你要问他要不要上厕所,他才会告诉你。幼儿园一共有 100 个小朋友,有两种方案可以解决小朋友上厕所的问题:

  • 1、每个小朋友配一个老师。每个老师隔段时间询问小朋友是否要上厕所,如果要上,就领他去厕所,100 个小朋友就需要 100 个老师来询问,并且每个小朋友上厕所的时候都需要一个老师领着他去上,这就是IO模型,一个连接对应一个线程。
  • 2、所有的小朋友都配同一个老师。这个老师隔段时间询问所有的小朋友是否有人要上厕所,然后每一时刻把所有要上厕所的小朋友批量领到厕所,这就是 NIO 模型,所有小朋友都注册到同一个老师,对应的就是所有的连接都注册到一个线程,然后批量轮询。

这就是 NIO 模型解决线程资源受限的方案,实际开发过程中,我们会开多个线程,每个线程都管理着一批连接,相对于 IO 模型中一个线程管理一条连接,消耗的线程资源大幅减少

线程切换效率低下

由于 NIO 模型中线程数量大大降低,线程切换效率因此也大幅度提高

IO读写面向流

IO 读写是面向流的,一次性只能从流中读取一个或者多个字节,并且读完之后流无法再读取,你需要自己缓存数据。 而 NIO 的读写是面向 Buffer 的,你可以随意读取里面任何一个字节数据,不需要你自己缓存数据,这一切只需要移动读写指针即可。

更多详细请看下面的文章:
Netty入门与实战:仿写微信IM即时通讯系统

一、netty开发的基本套路

Netty开发的基本套路很简洁,服务器端和客户端都是这样。
大致的套路基本如下:
file

Netty开发的实际代码过程,也确实并不复杂,就像下图这样,绿色的代表客户端流程、蓝色的代表服务器端流程,注意标红的部分。

实际代码过程就像下图这样:
file

二、 创建客户端类

pom.xml 引入依赖:

  <!-- netty -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.48.Final</version>
            <scope>compile</scope>
        </dependency>

1、创建Handler:

首先创建Handler类,该类用于接收服务器端发送的数据,这是一个简化的类,只重写了消息读取方法channelRead0、捕捉异常方法exceptionCaught。

客户端的Handler一般继承的是 SimpleChannelInboundHandler,该类有丰富的方法,心跳、超时检测、连接状态等等。

    import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
 * @Date: 2020/6/1 11:12
 * @Description: 通用handler,处理I/O事件
 */
@ChannelHandler.Sharable
public class HandlerClientHello extends SimpleChannelInboundHandler<ByteBuf>
{
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception
    {
        /**
        * @Description  处理接收到的消息
        **/
        System.out.println("接收到的消息:"+byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException
    {
        /**
        * @Description  处理I/O事件的异常
        **/
        cause.printStackTrace();
        ctx.close();
    }
}

代码说明:

1)@ChannelHandler.Sharable:这个注解是为了线程安全,如果你不在乎是否线程安全,不加也可以;
2)SimpleChannelInboundHandler:这里的类型可以是ByteBuf,也可以是String,还可以是对象,根据实际情况来;
3)channelRead0:消息读取方法,注意名称中有个0;
4)ChannelHandlerContext:通道上下文,代指Channel;
5)ByteBuf:字节序列,通过ByteBuf操作基础的字节数组和缓冲区,因为JDK原生操作字节麻烦、效率低,所以Netty对字节的操作进行了封装,实现了指数级的性能提升,同时使用更加便利;
6)CharsetUtil.UTF_8:这个是JDK原生的方法,用于指定字节数组转换为字符串时的编码格式。

2、创建客户端启动类:
客户端启动类根据服务器端的IP和端口,建立连接,连接建立后,实现消息的双向传输。

package ai.tradingquant.netttydemo;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;

import java.net.InetSocketAddress;

/**
 * 客户端启动类
 */
public class AppClientHello {
    private final String host;
    private final int port;

    public AppClientHello(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void  run() throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();// I/O 线程池

        try{

            Bootstrap bootstrap = new Bootstrap(); // 客户端辅助启动类
            bootstrap.group(group).channel(NioSocketChannel.class)  // 实例化一个channel
                    .remoteAddress(new InetSocketAddress(host, port))
                    .handler(new ChannelInitializer<SocketChannel>() { // 进行通道初始化配置

                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception{
                            socketChannel.pipeline().addLast(new HandlerClientHello()); // 添加自定义的Handler
                        }
                    });

            // 连接到远程节点,等待连接完成
            ChannelFuture future = bootstrap.connect().sync();

            // 发送消息到服务器端,编码格式是utf-8
            future.channel().writeAndFlush(Unpooled.copiedBuffer("Hello world, SIJIALI", CharsetUtil.UTF_8));

            // 阻塞操作,closeFuture() 开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws Exception {
        new AppClientHello("127.0.0.1", 18080).run();
    }
}

由于代码中已经添加了详尽的注释,这里只对极个别的进行说明:

1)ChannelInitializer:通道Channel的初始化工作,如加入多个handler,都在这里进行;
2)bootstrap.connect().sync():这里的sync()表示采用的同步方法,这样连接建立成功后,才继续往下执行;
3)pipeline():连接建立后,都会自动创建一个管道pipeline,这个管道也被称为责任链,保证顺序执行,同时又可以灵活的配置各类Handler,这是一个很精妙的设计,既减少了线程切换带来的资源开销、避免好多麻烦事,同时性能又得到了极大增强。

三、 创建服务端类

1、创建Handler:

和客户端一样,只重写了消息读取方法channelRead(注意这里不是channelRead0)、捕捉异常方法exceptionCaught。

另外服务器端Handler继承的是 ChannelInboundHandlerAdapter,而不是SimpleChannelInboundHandler,至于这两者的区别,这里不赘述,大家自行百度吧。

import io.netty.*;
/**
 * @Date: 2020/6/1 11:47
 * @Description: 服务器端I/O处理类
 */
@ChannelHandler.Sharable
public class HandlerServerHello extends ChannelInboundHandlerAdapter
{
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)  throws Exception
    {
        //处理收到的数据,并反馈消息到到客户端
        ByteBuf in = (ByteBuf) msg;
        System.out.println("收到客户端发过来的消息: "+ in.toString(CharsetUtil.UTF_8));

        //写入并发送信息到远端(客户端)
        ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是服务端,我已经收到你发送的消息", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
    {
        //出现异常的时候执行的动作(打印并关闭通道)
        cause.printStackTrace();
        ctx.close();
    }
}

以上代码很简洁,大家注意和客户端Handler类进行比较。

2、创建服务器端启动类:
服务器端启动类比客户端启动类稍显复杂一点,先贴出代码如下:

import io.netty.*;
import java.net.InetSocketAddress;

/**
 * @Date: 2020/6/1 11:51
 * @Description: 服务器端启动类
 */
public class AppServerHello
{
    private int port;

    public AppServerHello(int port)
    {
        this.port = port;
    }

    public void run() throws Exception
    {
        EventLoopGroup group = newNioEventLoopGroup();//Netty的Reactor线程池,初始化了一个NioEventLoop数组,用来处理I/O操作,如接受新的连接和读/写数据
        try{
            ServerBootstrap b = newServerBootstrap();//用于启动NIO服务
            b.group(group)
                    .channel(NioServerSocketChannel.class) //通过工厂方法设计模式实例化一个channel
                    .localAddress(newInetSocketAddress(port))//设置监听端口
                    .childHandler(newChannelInitializer<SocketChannel>() {
                        //ChannelInitializer是一个特殊的处理类,他的目的是帮助使用者配置一个新的Channel,用于把许多自定义的处理类增加到pipline上来
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {//ChannelInitializer 是一个特殊的处理类,他的目的是帮助使用者配置一个新的 Channel。
                            ch.pipeline().addLast(new HandlerServerHello());//配置childHandler来通知一个关于消息处理的InfoServerHandler实例
                        }
                    });

            //绑定服务器,该实例将提供有关IO操作的结果或状态的信息
            ChannelFuture channelFuture= b.bind().sync();
            System.out.println("在"+ channelFuture.channel().localAddress()+"上开启监听");

            //阻塞操作,closeFuture()开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开
            channelFuture.channel().closeFuture().sync();
        } finally{
            group.shutdownGracefully().sync();//关闭EventLoopGroup并释放所有资源,包括所有创建的线程
        }
    }

    public static void main(String[] args)  throws Exception
    {
        new AppServerHello(18080).run();
    }
}

代码说明:

1)EventLoopGroup:实际项目中,这里创建两个EventLoopGroup的实例,一个负责接收客户端的连接,另一个负责处理消息I/O,这里为了简单展示流程,让一个实例把这两方面的活都干了;
2)NioServerSocketChannel:通过工厂通过工厂方法设计模式实例化一个channel,这个在大家还没有能够熟练使用Netty进行项目开发的情况下,不用去深究。
到这里,我们就把服务器端和客户端都写完了 ,如何运行呢,先在服务器端启动类上右键,点Run 'AppServerHello.main()'菜单运行,见下图。

四、启动测试

启动服务端:

file

启动客户端:
file


相关文章:
史上最通俗Netty入门长文:基本介绍、环境搭建、动手实战
Netty入门与实战:仿写微信IM即时通讯系统

为者常成,行者常至