Netty基础

概述

TCP的编程要暂时脱离基于HTTP的MVC的编程范式,从网络的角度去看看一步一步的进化,这里看到了跟redis相同的Reactor模型。

首先,我们从BIO开始进入TCP编程,在BIO部分只做简单的介绍;

然后,进入NIO部分,对于多路复用的方式进行详细介绍;

接着,介绍Reactor线程模型;

最后,进入Netty部分,详细介绍Netty的基本概念与架构,介绍一个示例;

BIO

BIO(Block IO)是一种阻塞的IO模型,其结构如下:

BIO模型

其特点是,每个请求都需要独立的线程完成数据read,业务处理,数据write的完整操作。

这样带来了2个问题:

  • 这样当并发量较大时,创建大量线程来处理连接,系统资源占用大

  • 每个线程内部由于是堵塞缘故,效率也不高

@Slf4j
public class BIOServer {
    public static void main(String[] args) throws IOException {
        ServerSocket server = new ServerSocket(10002);
        while (true) {
            Socket client = server.accept(); //等待客户端的连接,如果没有获取连接  ,在此步一直等待
            new Thread(new ServerThread(client)).start(); //为每个客户端连接开启一个线程
        }
        //server.close();
    }
}
@Slf4j
class ServerThread extends Thread {
    private Socket client;
    public ServerThread(Socket client) {
        this.client = client;
    }
    @Override
    public void run() {
        log.info("客户端:" + client.getInetAddress().getLocalHost() + "已连接到服务器");
        BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream()));
        String mess = br.readLine();
        log.info("客户端:" + mess);
        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(client.getOutputStream()));
        bw.write(mess + "\n");
        bw.flush();
    }
}

主线程中进行accept,阻塞等待连接,各个ServerThread中进行连接的处理。read、write都在一个线程中完成,且都是阻塞的。

NIO

在操作系统级别就出现了多路复用技术,通过select或epoll来进行IO服用。在Java语言方面,与之对应的是NIO技术。

多路复用

先来看看多路复用的流程:

多路服用

以上是select,需要不断去遍历fd,epoll过程与之类似,好处是内核回调方式。

在多路复用的基础上,出现了NIO(New I/O 或者Non-Block I/O)

NIO模型

NIO 有三大核心部分:Channel( 通道) ,Buffer( 缓冲区), Selector( 选择器),关系如下图所示:

NIO
  • Selector对应一个线程,对应多个Channel
  • 每个Channel对应一个Buffer
  • Buffer 就是一个内存块,NIO的Buffer可以读也可写,通过flip方法切换。

示例

示例代码:

@Slf4j
public class NIOServer {
    private InetAddress addr;
    private int port;
    private Selector selector;
    private static int BUFF_SIZE = 1024;
    public NIOServer(InetAddress addr, int port) throws IOException {
        this.addr = addr;
        this.port = port;
        startServer();
    }
    private void startServer() throws IOException {
        // 获得selector及通道(socketChannel)
        this.selector = Selector.open();
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        // 绑定地址及端口
        InetSocketAddress listenAddr = new InetSocketAddress(this.addr, this.port);
        serverChannel.socket().bind(listenAddr);
        serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
        log.info("NIOServer运行中...按下Ctrl-C停止服务");
        while (true) {
            log.info("服务器等待新的连接和selector选择…");
            this.selector.select();
            // 选择key工作
            Iterator keys = this.selector.selectedKeys().iterator();
            while (keys.hasNext()) {
                SelectionKey key = (SelectionKey) keys.next();
                // 防止出现重复的key,处理完需及时移除
                keys.remove();
                //无效直接跳过
                if (!key.isValid()) {
                    continue;
                }
                if (key.isAcceptable()) {
                    this.accept(key);
                } else if (key.isReadable()) {
                    this.read(key);
                } else if (key.isWritable()) {
                    this.write(key);
                } else if (key.isConnectable()) {
                    this.connect(key);
                }
            }
        }
    }
    
    private void connect(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        if (channel.finishConnect()) {
            // 成功
            log.info("成功连接了");
        } else {
            // 失败
            log.info("失败连接");
        }
    }
    private void accept(SelectionKey key) throws IOException {
        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
        SocketChannel channel = serverChannel.accept();
        channel.configureBlocking(false);
        channel.register(this.selector, SelectionKey.OP_READ);

        Socket socket = channel.socket();
        SocketAddress remoteAddr = socket.getRemoteSocketAddress();
        log.info("连接到: " + remoteAddr);
    }
    private void read(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();

        ByteBuffer buffer = ByteBuffer.allocate(BUFF_SIZE);
        int numRead = channel.read(buffer);
        if (numRead == -1) {
            log.info("关闭客户端连接: " + channel.socket().getRemoteSocketAddress());
            channel.close();
            return;
        }
        String msg = new String(buffer.array()).trim();
        log.info("得到了: " + msg);

        // 回复客户端
        String reMsg = msg + " 你好,这是BIOServer给你的回复消息:" + System.currentTimeMillis();
        channel.write(ByteBuffer.wrap(reMsg.getBytes()));
    }
    private void write(SelectionKey key) throws IOException {
        ByteBuffer byteBuffer = ByteBuffer.allocate(BUFF_SIZE);
        byteBuffer.flip();
        SocketChannel clientChannel = (SocketChannel) key.channel();
        while (byteBuffer.hasRemaining()) {
            clientChannel.write(byteBuffer);
        }
        byteBuffer.compact();
    }

    public static void main(String[] args) throws IOException {
        new NIOServer(null, 10002);
    }
}

Selector中有selectedKeys的容器,selectedKey中包含着各个Channel。

ServerSocketChannel、SocketChannel都是Channel,ServerSocketChannel是主线程管理连接的channel,SocketChannel是处理客户请求的channel。

这里我们看到,Accept、read、write都不是在一个线程中完成,而是根据被分到各个channel中去执行。

@Slf4j
public class NIOClient {
    private static int BUFF_SIZE = 1024;
    public static void main(String[] args) throws IOException, InterruptedException {
        InetSocketAddress socketAddress = new InetSocketAddress("0.0.0.0", 10002);
        SocketChannel socketChannel = SocketChannel.open(socketAddress);
        log.info("连接 BIOServer 服务,端口:10002...");
        ArrayList<String> companyDetails = new ArrayList<>();

        // 创建消息列表
        companyDetails.add("腾讯");
        companyDetails.add("阿里巴巴");
        companyDetails.add("百度");

        for (String companyName : companyDetails) {
            socketChannel.write(ByteBuffer.wrap(companyName.getBytes()));
            log.info("发送: " + companyName);
            ByteBuffer buffer = ByteBuffer.allocate(BUFF_SIZE);
            buffer.clear();
            socketChannel.read(buffer);
            String result = new String(buffer.array()).trim();
            log.info("收到NIOServer回复的消息:" + result);
            // 等待2秒钟再发送下一条消息
            Thread.sleep(2000);
        }
        socketChannel.close();
    }
}

Reactor线程模型

NIO是典型的Reactor模型,一个或多个输入同时传递给服务处理器,服务端程序监听请求事件,并将它们同步分派给请求对应的处理线程。

Reactor模型有3种变种:

  • 单Reactor单线程
  • 单Reactor多线程
  • 主从Reactor多线程

单Reactor单线程

单线程Reactor

accept、IO(read+send)、handle都在一个线程中

单Reactor多线程

reactor多线程

这种情况下,增加worker threads,将工作放到其中运行。

accept、IO在一个线程,handler在线程池中运行

主从reactor的方式*

多Reactor模式

在这里将acceptor与read、send进行分离。

  • MainReactor负责客户端的连接请求,并将请求转交给SubReactor
  • SubReactor负责相应通道的IO读写请求
  • 非IO请求(具体逻辑处理)的任务则会直接写入队列,等待worker threads进行处理

Netty就是一种典型的基于主从Reactor的框架。

accept在一个线程,IO一个线程,handler一个线程池

Netty

Netty实现了对NIO的封装,通过这种封装,使用更方便,性能更强大。

基本概念

  • Bootstrap、ServerBootstrap

    Bootstrap意思是引导,一个Netty应用通常由一个Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,Netty中Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类。

  • Future、ChannelFuture

    正如前面介绍,在Netty中所有的IO操作都是异步的,不能立刻得知消息是否被正确处理,但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future和ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件。

  • NioEventLoop

    NioEventLoop中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务:

    • I/O任务 即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法触发(SelectedKey中封装Channel)。
    • 非IO任务 添加到taskQueue中的任务,如register0、bind0等任务,由runAllTasks方法触发。

    两种任务的执行时间比由变量ioRatio控制,默认为50,则表示允许非IO任务执行的时间与IO任务的执行时间相等。

  • NioEventLoopGroup

    NioEventLoopGroup,主要管理eventLoop的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个Channel上的事件,而一个Channel只对应于一个线程。

  • Selector

    Netty基于Selector对象实现I/O多路复用,通过 Selector, 一个线程可以监听多个连接的Channel事件, 当向一个Selector中注册Channel 后,Selector 内部的机制就可以自动不断地查询(select) 这些注册的Channel是否有已就绪的I/O事件(例如可读, 可写, 网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel 。

  • Channel

    Netty网络通信的组件,能够用于执行网络I/O操作。 Channel为用户提供:

    • 当前网络连接的通道的状态(例如是否打开?是否已连接?)
    • 网络连接的配置参数 (例如接收缓冲区大小)
    • 提供异步的网络I/O操作(如建立连接,读写,绑定端口),异步调用意味着任何I / O调用都将立即返回,并且不保证在调用结束时所请求的I / O操作已完成。调用立即返回一个ChannelFuture实例,通过注册监听器到ChannelFuture上,可以I / O操作成功、失败或取消时回调通知调用方。
    • 支持关联I/O操作与对应的处理程序
  • ChannelHandler

    ChannelHandler是一个接口,处理I / O事件或拦截I / O操作,并将其转发到其ChannelPipeline(业务处理链)中的下一个处理程序。

    ChannelHandler本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类:

    • ChannelInboundHandler用于处理入站I / O事件
    • ChannelOutboundHandler用于处理出站I / O操作

    或者使用以下适配器类:

    • ChannelInboundHandlerAdapter用于处理入站I / O事件
    • ChannelOutboundHandlerAdapter用于处理出站I / O操作
    • ChannelDuplexHandler用于处理入站和出站事件
  • ChannelHandlerContext

    保存Channel相关的所有上下文信息,同时关联一个ChannelHandler对象

  • ChannelPipline

    保存ChannelHandler的List,用于处理或拦截Channel的入站事件和出站操作。 ChannelPipeline实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及Channel中各个的ChannelHandler如何相互交互。

    一个 Channel 包含了一个 ChannelPipeline, 而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表, 并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。入站事件和出站事件在一个双向链表中,入站事件会从链表head往后传递到最后一个入站的handler,出站事件会从链表tail往前传递到最前一个出站的handler。

Netty基本概念

Netty架构*

Netty架构

示例

服务端代码

NettyServer.java

@Component
@Slf4j
public class NettyServer {
    /**
     * boss 线程组用于处理连接工作
     */
    private EventLoopGroup boss = new NioEventLoopGroup();
    /**
     * work 线程组用于数据处理
     */
    private EventLoopGroup work = new NioEventLoopGroup();

    @Value("${netty.port}")
    private Integer port;

    /**
     * 启动Netty Server
     *
     * @throws InterruptedException
     */
    @PostConstruct
    public void start() throws InterruptedException {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(boss, work)
                // 指定Channel
                .channel(NioServerSocketChannel.class)
            
                //使用指定的端口设置套接字地址
                .localAddress(new InetSocketAddress(port))

                //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
                .option(ChannelOption.SO_BACKLOG, 1024)

                //设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
                .childOption(ChannelOption.SO_KEEPALIVE, true)

                //将小的数据包包装成更大的帧进行传送,提高网络的负载
                .childOption(ChannelOption.TCP_NODELAY, true)

                .childHandler(new ServerChannelInitializer());
        ChannelFuture future = bootstrap.bind().sync();
        if (future.isSuccess()) {
            log.info("启动 Netty Server");
        }
    }

    @PreDestroy
    public void destory() throws InterruptedException {
        boss.shutdownGracefully().sync();
        work.shutdownGracefully().sync();
        log.info("关闭Netty");
    }
}

ServerChannelInitializer.java

public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //添加编解码
        socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
        socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
        socketChannel.pipeline().addLast(new NettyServerHandler());
    }
}

在这里增加对消息的的编码器、解码器。

NettyServerHandler.java

package com.easy.nettyServer;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 客户端连接会触发
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("Channel active......");
    }

    /**
     * 客户端发消息会触发
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("服务器收到消息: {}", msg.toString());
        ctx.write("我是服务端,我收到你的消息了!");
        ctx.flush();
    }

    /**
     * 发生异常触发
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端代码

NettyClient.java

@Component
@Slf4j
public class NettyClient {

    private EventLoopGroup group = new NioEventLoopGroup();
    @Value("${netty.port}")
    private Integer port;

    @Value("${netty.host}")
    private String host;

    private SocketChannel socketChannel;

    /**
     * 发送消息
     */
    public void sendMsg(String msg) {
        socketChannel.writeAndFlush(msg);
    }

    @PostConstruct
    public void start() {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .remoteAddress(host, port)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new NettyClientInitializer());
        ChannelFuture future = bootstrap.connect();
        //客户端断线重连逻辑
        future.addListener((ChannelFutureListener) future1 -> {
            if (future1.isSuccess()) {
                log.info("连接Netty服务端成功");
            } else {
                log.info("连接失败,进行断线重连");
                future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS);
            }
        });
        socketChannel = (SocketChannel) future.channel();
    }
}

这里增加了空闲检测,当服务器异常退出时,客户端进行重连接,ChannelFuture添加一个监听器,如果客户端连接服务端失败,调用 channel().eventLoop().schedule()方法执行重试逻辑。

NettyClientInitializer.java

public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast("decoder", new StringDecoder());
        socketChannel.pipeline().addLast("encoder", new StringEncoder());
        socketChannel.pipeline().addLast(new NettyClientHandler());
    }
}

NettyClientHandler.java

@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("客户端Active .....");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("客户端收到消息: {}", msg.toString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

在这里除了进行io操作,还可以进行一些诸如心跳机制的逻辑。重写userEventTriggered等函数,在该方法里实现发送心跳数据包的逻辑,同时将 IdleStateEvent类加入逻辑处理链上。

对于心跳,服务端作为被动接收一方,如果一段时间内服务端没有收到心跳包那么就直接断开连接。

一般有以下两种情况,Netty 客户端需要重连服务端:

  • Netty 客户端启动时,服务端挂掉,连不上服务端。

  • 在程序运行过程中,服务端突然挂掉

参考

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×