V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
congzhou
V2EX  ›  Java

请教大家一个关于 netty 多线程安全的一个问题,麻烦各位大佬解答一下!

  •  
  •   congzhou · 2019-07-19 21:05:07 +08:00 · 3137 次点击
    这是一个创建于 1714 天前的主题,其中的信息可能已经有所发展或是发生改变。

    netty 4.x

    rocketmq NettyServer#doOpen 添加 ChannelHandler 时,使用了 ChannelPipeline#addLast(EventExecutorGroup, ChannelHandler...) 方法,这个方法在每个 ChannelHandler 执行时都会使用 EventExecutorGroup

    ServerBootstrap childHandler =
                this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                    .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .option(ChannelOption.SO_KEEPALIVE, false)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                    .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                    .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                 // 就是这个 addLast 方法
                                .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                    new HandshakeHandler(TlsSystemConfig.tlsMode))
                                .addLast(defaultEventExecutorGroup,
                                    new NettyEncoder(),
                                    new NettyDecoder(),
                                    new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                    new NettyConnectManageHandler(),
                                    new NettyServerHandler()
                                );
                        }
                    });
    
    1. 假设 nio 触发了读事件
    2. 第一次 NioByteUnsafe#read 里面的 AbstractNioByteChannel#doReadBytes 完毕
    3. 然后执行 ChannelPipeline#fireChannelRead。
    4. 调用 AbstractChannelHandlerContext#invokeChannelRead(AbstractChannelHandlerContext, Object),每次执行的时候都会分发一个新的线程去解码 ByteToMessageDecoder#channelRead
    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
            final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
            // 拿到当前 ChannelHandler 指定的线程池
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeChannelRead(m);
            } else {
                // 因为这个线程池不是 eventLoopGroupSelector 的,所以会走这边
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelRead(m);
                    }
                });
            }
        }
    

    然后执行完这个方法就返回了,所以又会去执行 NioByteUnsafe#read 里面的 do while,重新执行上面的 1.2.3.4。然后又去执行 ByteToMessageDecoder#channelRead。 假设第一次 ByteToMessageDecoder#channelRead 还没执行,第二次 ByteToMessageDecoder#channelRead 也有可能会先执行了吧?

    第 1 条附言  ·  2019-07-19 21:52:47 +08:00

    第二行复制错了是 rocketmq NettyRemotingServer#start 最后一段 重新执行上面的 1.2.3.4 写错了,是 重新执行上面的 2.3.4 第一次写有点紧张,sorry

    第 2 条附言  ·  2019-07-19 22:22:48 +08:00

    打扰大家了,已经看到为什么了
    DefaultChannelPipeline#childExecutor

    private EventExecutor childExecutor(EventExecutorGroup group) {
            if (group == null) {
                return null;
            }
            Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
            if (pinEventExecutor != null && !pinEventExecutor) {
                return group.next();
            }
            Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
            if (childExecutors == null) {
                // Use size of 4 as most people only use one extra EventExecutor.
                childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
            }
            // Pin one of the child executors once and remember it so that the same child executor
            // is used to fire events for the same channel.
            EventExecutor childExecutor = childExecutors.get(group);
            if (childExecutor == null) {
                childExecutor = group.next();
                childExecutors.put(group, childExecutor);
            }
            return childExecutor;
        }
    

    应该一个 channel 会指定一个 SingleThreadEventExecutor

    1 条回复    2019-12-02 17:21:17 +08:00
    mazai
        1
    mazai  
       2019-12-02 17:21:17 +08:00
    netty 是不存在多线程的问题的,因为一个 Channel 注册的时候只会只会注册到一个 EventLoop 上( SingleThreadEventExecutor 本质上就是 EventLoop ),在注册的时候会判断当前执行注册的线程是不是 EventLoop 所在的线程,如果是就注册,否则就使用 EventLoop 所在的线程发起一个任务执行注册。一个 EventLoop 上会注册多个 Channel,所以在 EventLoop 维护的 Channel 生命周期中只会使用一个线程执行,因此也就不存在多线程的问题!

    注册的部分可以看 AbstractUnsafe.register();
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   3347 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 35ms · UTC 10:45 · PVG 18:45 · LAX 03:45 · JFK 06:45
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.