netty使用详解

简介

  • Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。

  • 他能通过编程自定义各种协议。

高并发原理

Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,对比于BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高,两张图让你了解BIO和NIO的区别:

阻塞:

阻塞io通信方式

非阻塞:

非阻塞IO通信方式

当一个连接建立之后,他有两个步骤要做,第一步是接收完客户端发过来的全部数据,第二步是服务端处理完请求业务之后返回response给客户端。NIO和BIO的区别主要是在第一步。
在BIO中,等待客户端发数据这个过程是阻塞的,这样就造成了一个线程只能处理一个请求的情况,而机器能支持的最大线程数是有限的,这就是为什么BIO不能支持高并发的原因。
而NIO中,当一个Socket建立好之后,Thread并不会阻塞去接受这个Socket,而是将这个请求交给Selector,Selector会不断的去遍历所有的Socket,一旦有一个Socket建立完成,他会通知Thread,然后Thread处理完数据再返回给客户端——这个过程是不阻塞的,这样就能让一个Thread处理更多的请求了。

开发流程

基本流程

详细流程

关键知识点

springBoot整合Netty

添加依赖

  <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <!--不用加版本号-->
  </dependency>

服务端

public class NettyServer {
    public static void main(String[] args) {
    
    
    
    	//1 . 创建两个EventLoopGroup,这两个可以看作是线程池,用于循环事件,
    	//分别是bossGroup与workerGroup(你也可以管他们叫parentGroup 与childGroup)
    	//这个两个名字是有实际含义的,表示任务分工不一样,bossGroup其实就是专门用来创建连接的,
    	//而workerGroup专门就是接收请求,处理请求,发送响应的,也就是专门干活的
        EventLoopGroup  bossGroup=new NioEventLoopGroup(1);
        EventLoopGroup  workerGroup=new NioEventLoopGroup();
        
        
        try {
        
        
        	//2. ServerBootstrap这个可以理解为服务器
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            
            
            
            //3. group主要传刚刚创建的两个group
            //   channel的话就是使用哪种类型的channel
            //   option的话就是服务端程序的设置
            //   childOption 是与客户端连接的一些设置
            //   childHandler其实就是与某个客户端通信的时候,
            //比如读取客户端发送过来的数据,读到这些数据之后,进行的一些处理,这里是一个初始化的handler
            //	 
            serverBootstrap
                    .group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024)
                    .option(ChannelOption.SO_REUSEADDR,true)
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    .childOption(ChannelOption.SO_RCVBUF,64*1024)
                    .childOption(ChannelOption.SO_SNDBUF,64*1024)
                    .childHandler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(Channel channel) throws Exception {
                        
                        
                        
                        //3.1 也就是当与某个客户端建立连接完事的时候,将这个连接channel丢给
                        //某个workerGroup中的某个线程进行注册,并初始化处理链,这里面这个
                        //pipeline 就是一条处理链,每个连接有一个pipeline,我们这里往
                        //pipeline处理链后面添加了一个处理器,也就是NettyServerHandler 
                            ChannelPipeline pipeline = channel.pipeline();
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });
                    
                    
                    
            // 4. 绑定端口,监听端口
            ChannelFuture channelFuture= serverBootstrap.bind(10009).sync();
            
            
            // 5. 作用是等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
            
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

这个handler是放到pipeline里面的,也就是某个连接有读写事件的时候,worker线程就会调用pipeline处理链进行处理,这里重写了channelRead方法,就是某个客户端发送过来数据,然后worker线程拿到这个read事件的发生,就会调用pipeline进行数据的处理,这里就是读出来数据,然后打印,最后又发送响应给客户端。

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf= (ByteBuf)msg;
        byte[] body=new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(body);
        System.out.println(new String(body));
        byte[] responseBytes = "hi,客户端,我是服务端".getBytes();
        ctx.channel().writeAndFlush(Unpooled.wrappedBuffer(responseBytes));
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
    }
}

客户端

客户端程序跟服务端程序差不多,就是EventLoopGroup组少了一个,然后对应的channel就变成了NioSocketChannel。

最后是connect进行连接。这里需要注意的是,我们将handler放到了bossGroup的pipeline处理链中,这个bossGroup干的事情就是建立连接,发送数据,读取响应。

public class NettyClient {

    public static void main(String[] args) {


        EventLoopGroup  bossGroup=new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap
                    .group(bossGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_RCVBUF, 64 * 1024)
                    .option(ChannelOption.SO_SNDBUF, 64 * 1024)
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<Channel>() {
                        @Override
                        protected void initChannel(Channel channel) throws Exception {
                            ChannelPipeline pipeline = channel.pipeline();
                            pipeline.addLast(new NettyClientHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 10009).sync();
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
        }

    }
}

channelActive 这个方法是当连接建立完成的时候,然后向服务端发送了hi,服务端,我是客户端! 这句话,当收到服务端响应的时候,就会走channelRead方法,这里就是读取到数据,然后打印一下。

public class NettyClientHandler extends ChannelInboundHandlerAdapter {


    /**
     * 连接建立的完成
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        byte[] bytes = "hi,服务端,我是客户端!".getBytes();
        ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
        ctx.channel().writeAndFlush(byteBuf);
    }

    /**
     * 读取数据
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf responseByteBuf= (ByteBuf)msg;
        byte[] responseByte=new byte[responseByteBuf.readableBytes()];
        responseByteBuf.readBytes(responseByte);
        System.out.println(new String(responseByte));
    }


    /**
     * 异常
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
    }
}

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×