• netty分为服务端/客户端、init、handler三层,init进行多处理器组装,在handler中进行处理

服务端

需求:多个客户端连接服务器进行通信,获取所有客户端对象,对每个客户(包括自己)发送(回显)信息

MyChatServer

public class MyChatServer {
    public static void main(String[] args) throws Exception {
        //事件循环组,boosGroup负责获取连接 后发送连接给workerGroup进行处理
        EventLoopGroup boosGroup= new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try{
            //a helper class that makes it easy to bootstrap
            //一个对连接进行了封装、简化的服务端启动类
            ServerBootstrap serverBootstrap = new ServerBootstrap();

            //定义组、子处理器等,MyChatServerInit在channel被注册时,就会创建调用
            serverBootstrap.group(boosGroup,workerGroup).channel(NioServerSocketChannel.class).
                    childHandler(new MyChatServerInit());

            //绑定一个端口并且同步
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            //优雅关闭 -- Elegant close
            boosGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

MyChatServerInit

public class MyChatServerInit extends ChannelInitializer<SocketChannel> {
    //这是一个回调的方法,在channel被注册时被调用
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        //一个管道,包含多个ChannelHandler进行处理 ≈ 拦截器
        ChannelPipeline channelPipeline = ch.pipeline();

        //TCP粘包处理  看 Working process了解
        channelPipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));

        //对字符串进行编解码的处理器
        channelPipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        channelPipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));

        //自定义处理器 -- 进行核心处理
        channelPipeline.addLast(new MyChatServerHandler());
    }
}

MyChatServerHandler

/**
 * 继承InboundHandler类,代表处理进入的请求,还有OutboundHandler,处理出去请求
 * 其中里面的泛型表示msg的类型,如果指定了HttpObject,表明这是个HTTP连接的对象
 */
public class MyChatServerHandler extends SimpleChannelInboundHandler<String> {

    //定义一个对象组,保存所有客户端对象
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    //channelRead0读取客户端请求,并返回响应的方法
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        Channel channel = ctx.channel();

        //获取并判断当前对象进行处理,客户端输入的数据 会给所有对象进行消息推送,包括自己
        channelGroup.forEach(ch ->{
            if(channel != ch) {
                ch.writeAndFlush("[客户]"+channel.remoteAddress()+" 发送的消息:"+msg+"\n");
            }else {
                ch.writeAndFlush("[自己]"+msg+"\n");
            }
        });
    }

     /**
     * handler处理顺序如下:
     * handlerAdded
     * channelRegistered
     * channelActive
     * (下面的表示的是断开连接后)
     * 1.如果是使用curl :连接会立刻关闭
     * 2.如果是浏览器访问,http1.0:它是短连接,会立刻关闭。http1.1,是长连接,连接保持一段时间
     * channelInactive
     * channelUnregistered
     */

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channel.writeAndFlush("[客户端]-"+channel.remoteAddress()+"加入"+"\n");
        channelGroup.add(channel);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channel.writeAndFlush("[客户端]-"+channel.remoteAddress()+"离开"+"\n");
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress()+"上线啦!!!!");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress()+"离线啦!!!!");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客户端

需求:客户端连接服务端后,给所有其他客户端发送/显示消息,跟服务器一样分三层

MyChatClient

public class MyChatClient {
    public static void main(String[] args)throws Exception {

        //定义事情循环组进行处理,客户端定义一个组只进行监听处理
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).
                    handler(new MyChatClientInit());

            Channel channel = bootstrap.connect("localhost",8899).sync().channel();

            //建立好连接后,等待用户输入内容后写入channel,再发送到服务器中
            BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
            for (;;) {
                channel.writeAndFlush(reader.readLine()+"\r\n");
            }
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

MyChatServerInit

跟服务器端差不多,只需要改成客户端的handler

public class MyChatClientInit extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline channelPipeline = ch.pipeline();

        channelPipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
        channelPipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        channelPipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        channelPipeline.addLast(new MyChatChlientHandler());
    }
}

MyChatChlientHandler

public class MyChatChlientHandler extends SimpleChannelInboundHandler<String> {
    //客户端只需要在`控制台`中显示 服务端的提示和 其他客户端发送的信息
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg);
    }
}

program_result

启动服务器后,依次运行三个客户端,在服务端控制台显示客户端上线信息,当客户端发送消息换行后,会在其他客户端中显示发送的消息。

服务端

客户端1

客户端2

客户端3

working_process_tips

IDEA如何启动多个客户端

Edit Configurations --> Allow parellel run
idea启动多个客户端

TCP粘包处理

java netty使用DelimiterBasedFrameDecoder处理tcp粘包问题
TCP的粘包现象

    1. 粘包现象

      TCP粘包是指发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾

    1. 粘包原因
      (1)发送方原因
        TCP默认会使用Nagle算法。而Nagle算法主要做两件事:1)只有上一个分组得到确认,才会发送下一个分组;2)收集多个小分组,在一个确认到来时一起发送。所以,正是Nagle算法造成了发送方有可能造成粘包现象。
      (2)接收方原因
        TCP接收到分组时,并不会立刻送至应用层处理,或者说,应用层并不一定会立即处理;实际上,TCP将收到的分组保存至接收缓存里,然后应用程序主动从缓存里读收到的分组。这样一来,如果TCP接收分组的速度大于应用程序读分组的速度,多个包就会被存至缓存,应用程序读时,就会读到多个首尾相接粘到一起的包。
    1. 接收方应用层处理

      应用程序在处理从缓存读来的分组时,读完一条数据,就应该循环读下一条数据,直到所有的数据都被处理,其中关键在于如何给每个数据包添加边界信息

  1)格式化数据:每条数据有固定的格式(开始符、结束符),这种方法简单易行,但选择开始符和结束符的时候一定要注意每条数据的内部一定不能出现开始符或结束符;
  2)发送长度:发送每条数据的时候,将数据的长度一并发送,比如可以选择每条数据的前4位是数据的长度,应用层处理时可以根据长度来判断每条数据的开始和结束。