V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
amiwrong123
V2EX  ›  Java

为什么我这么用 NIO 实现聊天室,有个 bug

  •  
  •   amiwrong123 · 2020-03-18 00:36:40 +08:00 · 2939 次点击
    这是一个创建于 1748 天前的主题,其中的信息可能已经有所发展或是发生改变。

    服务端代码:

    package NonBlocking;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.nio.charset.Charset;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    
    public class ChatServer {
        private final int port = 8899;
        private final String seperator = "[|]";                        //消息分隔符
        private final Charset charset = Charset.forName("UTF-8");    //字符集
        private ByteBuffer buffer = ByteBuffer.allocate(1024);        //缓存
        private Map<String, SocketChannel> onlineUsers = new HashMap<String, SocketChannel>();//将用户对应的 channel 对应起来
        private Selector selector;
        private ServerSocketChannel server;
    
        public void startServer() throws IOException {
            //NIO server 初始化固定流程:5 步
            selector = Selector.open();                    //1.selector open
            server = ServerSocketChannel.open();        //2.ServerSocketChannel open
            server.bind(new InetSocketAddress(port));    //3.serverChannel 绑定端口
            server.configureBlocking(false);            //4.设置 NIO 为非阻塞模式
            server.register(selector, SelectionKey.OP_ACCEPT);//5.将 channel 注册在选择器上
    
            //NIO server 处理数据固定流程:5 步
            SocketChannel client;
            SelectionKey key;
            Iterator<SelectionKey> iKeys;
    
            while (true) {
                selector.select();                            //1.用 select()方法阻塞,一直到有可用连接加入
                iKeys = selector.selectedKeys().iterator();    //2.到了这步,说明有可用连接到底,取出所有可用连接
                while (iKeys.hasNext()) {
                    key = iKeys.next();                        //3.遍历
                    if (key.isAcceptable()) {                    //4.对每个连接感兴趣的事做不同的处理
                        //对于客户端连接,注册到服务端
                        client = server.accept();            //获取客户端首次连接
                        client.configureBlocking(false);
                        //不用注册写,只有当写入量大,或写需要争用时,才考虑注册写事件
                        client.register(selector, SelectionKey.OP_READ);
                        System.out.println("+++++客户端:" + client.getRemoteAddress() + ",建立连接+++++");
                        client.write(charset.encode("请输入自定义用户名:"));
                    }
                    if (key.isReadable()) {
                        client = (SocketChannel) key.channel();//通过 key 取得客户端 channel
                        StringBuilder msg = new StringBuilder();
                        buffer.clear();        //多次使用的缓存,用前要先清空
                        try {
                            System.out.println(buffer);
                            while (client.read(buffer) > 0) {
                                buffer.flip();    //将写模式转换为读模式
                                msg.append(charset.decode(buffer));
                                buffer.clear();
                            }
                        } catch (IOException e) {
                            //如果 client.read(buffer)抛出异常,说明此客户端主动断开连接,需做下面处理
                            client.close();            //关闭 channel
                            key.cancel();            //将 channel 对应的 key 置为不可用
                            onlineUsers.values().remove(client);    //将问题连接从 map 中删除
                            System.out.println("-----用户'" + key.attachment().toString() + "'退出连接,当前用户列表:" + onlineUsers.keySet().toString() + "-----");
                            continue;                //跳出循环
                        }
                        if (msg.length() > 0) this.processMsg(msg.toString(), client, key);    //处理消息体
                    }
                    iKeys.remove();                    //5.处理完一次事件后,要显式的移除
                }
            }
        }
    
        /**
         * 处理客户端传来的消息
         *
         * @param msg 格式:user_to|body|user_from
         * @throws IOException
         * @Key 这里主要用 attach()方法,给通道定义一个表示符
         */
        private void processMsg(String msg, SocketChannel client, SelectionKey key) throws IOException {
            String[] ms = msg.split(seperator);
            if (ms.length == 1) {
                String user = ms[0];    //输入的是自定义用户名
                if (onlineUsers.containsKey(user)) {
                    client.write(charset.encode("当前用户已存在,请重新输入用户名:"));
                } else {
                    onlineUsers.put(user, client);
                    key.attach(user);    //给通道定义一个表示符
                    String welCome = "\t 欢迎'" + user + "'上线,当前在线人数" + this.getOnLineNum() + "人。用户列表:" + onlineUsers.keySet().toString();
                    client.write(charset.encode("您的昵称通过验证 "+user));
                    this.broadCast(welCome);    //给所用用户推送上线信息,包括自己
                }
            } else if (ms.length == 2) {
                String msg_body = ms[0];
                String user_from = ms[1];
                broadCast("来自'" + user_from + "'的消息:" + msg_body);
            }
        }
    
        //map 中的有效数量已被很好的控制,可以从 map 中获取,也可以用下面的方法取
        private int getOnLineNum() {
            int count = 0;
            Channel channel;
            for (SelectionKey k : selector.keys()) {
                channel = k.channel();
                if (channel instanceof SocketChannel) {    //排除 ServerSocketChannel
                    count++;
                }
            }
            return count;
        }
    
        //广播上线消息
        private void broadCast(String msg) throws IOException {
            Channel channel;
            for (SelectionKey k : selector.keys()) {
                channel = k.channel();
                if (channel instanceof SocketChannel) {
                    SocketChannel client = (SocketChannel) channel;
                    client.write(charset.encode(msg));
                }
            }
        }
    
        public static void main(String[] args) {
            try {
                new ChatServer().startServer();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    客户端代码:

    package NonBlocking;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.nio.charset.Charset;
    import java.util.Iterator;
    import java.util.Scanner;
    
    public class ChatClient1 {
    
        private final int port = 8899;
        private final String seperator = "|";
        private final Charset charset = Charset.forName("UTF-8");	//字符集
        private ByteBuffer buffer = ByteBuffer.allocate(1024);
        private SocketChannel _self;
        private Selector selector;
        private String name = "";
        private boolean flag = true;	//服务端断开,客户端的读事件不会一直发生(与服务端不一样)
    
        Scanner scanner = new Scanner(System.in);
        public void startClient() throws IOException{
            //客户端初始化固定流程:4 步
            selector = Selector.open();								//1.打开 Selector
            _self = SocketChannel.open(new InetSocketAddress(port));//2.连接服务端,这里默认本机的 IP
            _self.configureBlocking(false);							//3.配置此 channel 非阻塞
            _self.register(selector, SelectionKey.OP_READ);			//4.将 channel 的读事件注册到选择器
    
            /*
             * 因为等待用户输入会导致主线程阻塞
             * 所以用主线程处理输入,新开一个线程处理读数据
             */
            new Thread(new ClientReadThread()).start();	//开一个异步线程处理读
            String input = "";
            while(flag){
                input = scanner.nextLine();
                if("".equals(input)){
                    System.out.println("不允许输入空串!");
                    continue;
                }else if("".equals(name)){
                    //name = input;
                    //selector.keys().iterator().next().attach(name);		//给通道添加名称
                }else if(!"".equals(name)) {
                    input = input + seperator + name;
                }
                try{
                    _self.write(charset.encode(input));
                }catch(Exception e){
                    System.out.println(e.getMessage()+"客户端主线程退出连接!!");
                }
            }
        }
    
        private class ClientReadThread implements Runnable{
            @Override
            public void run(){
                Iterator<SelectionKey> ikeys;
                SelectionKey key;
                SocketChannel client;
                try {
                    while(flag){
                        selector.select();	//调用此方法一直阻塞,直到有 channel 可用
                        ikeys = selector.selectedKeys().iterator();
                        while(ikeys.hasNext()){
                            key = ikeys.next();
                            if(key.isReadable()){	//处理读事件
                                client = (SocketChannel) key.channel();
                                //这里的输出是 true,从 selector 的 key 中获取的客户端 channel,是同一个
    //							System.out.println("client == _self:"+ (client == _self));
                                buffer.clear();
                                StringBuilder msg = new StringBuilder();
                                try{
                                    while(client.read(buffer) > 0){
                                        buffer.flip();	//将写模式转换为读模式
                                        msg.append(charset.decode(buffer));
                                    }
                                }catch(IOException en){
                                    System.out.println(en.getMessage()+",客户端'"+key.attachment().toString()+"'读线程退出!!");
                                    stopMainThread();
                                }
    
                                if (msg.toString().contains("您的昵称通过验证")) {
                                    String[] returnStr = msg.toString().split(" ");
                                    name = returnStr[1];
                                    key.attach(name);
                                }
                                System.out.println(msg.toString());
                            }
                            ikeys.remove();
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void stopMainThread(){
            flag = false;
        }
    
        public static void main(String[] args){
            try {
                new ChatClient1().startClient();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    使用方法:

    • 仿照着 ChatClient1,再弄一个 ChatClient2。先运行服务端,再运行客户端 1、客户端 2。
    • 客户端里,先输入自己的昵称。
    • 之后,输入想说的话。

    但不知道为什么,第一个运行的客户端,打印出来的话就是不对,第二个运行的客户端就是对的。 效果是这样的:

    第一个运行的客户端打印效果:

    请输入自定义用户名:
    dio
    您的昵称通过验证 dio	欢迎'dio'上线,当前在线人数 1 人。用户列表:[dio]
    dkajgk
    来自'dio	欢迎'dio'上线,当前在线人数 1 人。用户列表:[dio]'的消息:dkajgk
    哈哈哈哈
    来自'dio	欢迎'dio'上线,当前在线人数 1 人。用户列表:[dio]'的消息:哈哈哈哈
       欢迎'jojo'上线,当前在线人数 2 人。用户列表:[dio, jojo]
    来自'jojo'的消息:哈哈哈哈
    来自'jojo'的消息:我不做人啦
    

    第二个运行的客户端打印效果:

    请输入自定义用户名:
    jojo
    您的昵称通过验证 jojo
       欢迎'jojo'上线,当前在线人数 2 人。用户列表:[dio, jojo]
    哈哈哈哈
    来自'jojo'的消息:哈哈哈哈
    我不做人啦
    来自'jojo'的消息:我不做人啦
    

    可以看到,第一个运行的客户端,总会多打印出来,他自己上线时的通知。这是什么鬼啊?不信你们试一下 而且我要是通过 debug 方式来运行服务端,就是对的了,奇怪了啊

    13 条回复    2020-03-19 10:40:08 +08:00
    jinhan13789991
        1
    jinhan13789991  
       2020-03-18 08:34:40 +08:00 via Android
    代码重新排版下吧~不好看清楚
    amiwrong123
        2
    amiwrong123  
    OP
       2020-03-18 09:36:19 +08:00 via Android
    @jinhan13789991
    可能因为是手机上吧,等会我再发个链接出来
    amiwrong123
        3
    amiwrong123  
    OP
       2020-03-18 10:20:45 +08:00
    cion
        4
    cion  
       2020-03-18 10:47:37 +08:00
    服务器的响应信息粘包了吧,client 的 name 被错误写入了广播上线消息
    restlessdream
        5
    restlessdream  
       2020-03-18 10:55:05 +08:00
    ```java
    System.out.println(buffer);
    while (client.read(buffer) > 0) {
    buffer.flip(); //将写模式转换为读模式
    msg.append(charset.decode(buffer));
    buffer.clear();
    }
    ```

    可能原因在于这里,网络上收包不会一次性的收到发送方发送的完整包,不同的机器上表现会不同,我的机器上,发送 10 个字符,服务器要收 2 次才收到完整的消息,有可能是你服务器收到一两个字符就 read 不到数据了,需要等个几纳秒,微妙或者毫秒才有剩余的数据,对于发送方也是同样的道理。

    正确的处理方式是按照分隔符的方式来收包,比如 telnet 那种,或者按照报文长度来收包,比如报文前两个字节表示报文长度,收到期望的长度才算这个包结束。。总而言之,就是你要收到完整的你期望的报文在进行处理,而不是像现在这样简单粗暴的这么处理。。

    可能还有其他问题,总之你的代码还有其他问题,比如没有检测 key 是不是 writable 就直接 write,虽然一般情况下 accpet 之后,这个 key 就可以 write 了。。
    jinhan13789991
        6
    jinhan13789991  
       2020-03-18 11:06:26 +08:00
    可以使用 DataInputStream 的 readUTF() 和 DataOutputStream 的 writeUTF() 方法来来做写入和读取处理,
    另外建议自定义 json 来实现协议,比如:
    {
    "type": 1,
    "data": {
    "msg_id": 1,
    "msg_content": "nihao"
    }
    }

    定义 type 来区分消息类型,登陆 注册 新消息 下线等。
    jinhan13789991
        7
    jinhan13789991  
       2020-03-18 11:08:14 +08:00
    DataStream 可以保证每一次写入和读出都是完整的
    amiwrong123
        8
    amiwrong123  
    OP
       2020-03-18 11:19:51 +08:00
    @cion 但为什么第二个运行的客户端,就肯定不会粘包啊,第三个,第四个也不会
    amiwrong123
        9
    amiwrong123  
    OP
       2020-03-18 11:34:57 +08:00
    @restlessdream 好吧,你说的这个,我还没有考虑,主要刚开始熟悉 NIO,合计先用比较简单的方法来实现功能。
    你说的原因,我想了下。那个上线通知是服务端通知给客户端的啊,为毛之后 客户端给服务端发消息,会带有这个上线通知呢,这不是两个方向吗?有点没想通
    amiwrong123
        10
    amiwrong123  
    OP
       2020-03-18 13:15:39 +08:00
    @jinhan13789991
    @restlessdream
    @cion

    自己看了一遍,感觉像是,这里出错了。
    client.write(charset.encode("您的昵称通过验证 "+user));
    this.broadCast(welCome); //给所用用户推送上线信息,包括自己

    这里的两句,好像是,时间间隔太短了。然后客户端在读取的时候,把两次的信息,都一次性读取出来了。所以客户端在收到验证后的名字时,先用空格分隔,然后第二个元素里不仅有名字,还有上线通知,所以客户端把名字和上线通知作为了自己的名字。

    但是奇怪的是,为啥,第二个客户端不会发生这样的情况?
    zshneedmoney
        11
    zshneedmoney  
       2020-03-18 18:17:23 +08:00
    打个断点吧
    cion
        12
    cion  
       2020-03-19 09:08:28 +08:00
    @amiwrong123 可能是因为出现第 2 个客户端之后,广播存在遍历 SelectionKey 的过程导致不是连续向同一客户端发送报文,事实上我这边没有运行过,猜测。
    amiwrong123
        13
    amiwrong123  
    OP
       2020-03-19 10:40:08 +08:00 via Android
    @cion
    对,第二个客户端不出现确实是因为你说的。

    我昨天回去又试了,发现第一个客户端出现沾包的情况也不是必现的。

    然后觉得 tcp 只是实现了传输,在 tcp 之上还得实现对内容协议的规定,和发送回复的流程,比如 http
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1146 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 17:57 · PVG 01:57 · LAX 09:57 · JFK 12:57
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.