V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
Tongwin
V2EX  ›  Java

Java 消费 Kafka 如何精确控制部分 consumer 的消费速率?

  •  
  •   Tongwin · 351 天前 · 3518 次点击
    这是一个创建于 351 天前的主题,其中的信息可能已经有所发展或是发生改变。
    请教各位大佬。
    现状:我的应用如下:
    跑一个 Java 应用,消费多个 topic 。创建了好几个 consumer 进行消费,每次 consumer.poll 得到的 records 都通过线程池 ThreadPoolExecutor 处理数据。每个 consumer 对应一个 ThreadPoolExecutor

    新需求:现需要新增消费一个 topic ,该 topic 的量比存量的 topic 要远远大于。因此需要控制一下该 topic 每个批次消费到的数量。

    发现问题:刚开始我以为只需要配置 max.poll.record 就可以控制每个批次的消费速率,但经过测试发现,由于每次消费到的 records 都让线程池去处理了,因此 consumer.poll 一次数据在一个批次内就识别到很快就处理完,然后 consumer 就会在一个批次内尽可能地去 poll 多几次。这样就没法实现每个批次控制了。

    请教大家:针对以上情况有优化的可能性吗?需要尽可能精确控制指定 topic 一个批次内只需要消费固定的数据量,我目前发现 sparkStreaming 倒是很好地控制,但是 Java 目前没找到合适的方案来实现控制。
    第 1 条附言  ·  351 天前
    目前是通过 guava 组件的 RateLimter 限流器来实现特定限流。 通过设置 RateLimter.create(1),即 1 秒处理 1 次。 通过这个机制来限流控制特定的 consumer 在进行 poll
    if(rateLimter.tryActive()) {
    consumer.poll();
    for (record:records) {
    // 丢到线程池异步业务处理
    }
    }
    44 条回复    2023-12-28 15:39:47 +08:00
    codedreamstar
        1
    codedreamstar  
       351 天前
    有没有一种可能, 你的 Java 应用=线程池, 创建的消费者数量=线程数, topic=任务队列, 所以在消费者这里再加一个内存的线程池是为了什么?
    至于控制 kafka 消费者查一下 pause 相关的 api, 能暂停或者说挂起消费者
    diagnostics
        2
    diagnostics  
       351 天前
    Kafka Consumer 不一直都是需要写自旋逻辑不断拉取线程吗?写个计数器和计时器不就完了,超出就 sleep 一下
    diagnostics
        3
    diagnostics  
       351 天前
    2 分钟写了个案例,如果要更精细用 nanoTime
    ```java
    int MAX_CNT_OF_SEC = 100;
    int count = 0;
    long lastPoll = -1l;
    long ONE_SEC = Duration.ofSeconds(1L).toMillis();
    while (true) {
    long now = System.currentTimeMillis();
    long diff = now - lastPoll;
    if (diff < ONE_SEC || count >= MAX_CNT_OF_SEC) {
    Thread.sleep(ONE_SEC - diff);
    }

    ConsumerRecords<String, String> records =
    consumer.poll(Duration.ofMillis(100));

    count += records.count();

    // TODO
    }

    ```
    lsk569937453
        4
    lsk569937453  
       351 天前   ❤️ 1
    新需求:现需要新增消费一个 topic ,该 topic 的量比存量的 topic 要远远大于。因此需要控制一下该 topic 每个批次消费到的数量

    先把问题表述清楚吧。。。。。你自己看看你说的是啥

    两个不同的消费者消费不同的 topic ,这俩之间没任何逻辑上的关系的,所以你限制数量是为了啥????????
    diagnostics
        5
    diagnostics  
       351 天前
    @diagnostics #3 发完发现漏写 count 归 0 和 更新 lastPoll 了,思路差不多
    wqhui
        6
    wqhui  
       351 天前
    没看懂,是因为应用使用线程池处理数据,而拉取线程只管拉取,导致拉取过快,消费跟不上,把应用内存弄爆?这样理解的话可以在拉下来数据推线程池处理的时候限流,阻塞拉取线程
    Tongwin
        7
    Tongwin  
    OP
       351 天前
    @codedreamstar 不好意思之前没讲清楚,创建内存的线程池主要是为了异步处理消费者数量,consumer.poll 是不受内存线程池影响的。
    @diagnostics 感谢提供思路,我之前也有考虑过用 sleep 来控制 poll 的次数,但考虑到实例多、消费 Topic 数量多等复杂情况,没有深入了解就用 sleep 感觉不太稳妥。
    @lsk569937453 不好意思没有表达清楚需求。 由于存量 topic 推送过来的数据量并不大,因此目前并没有做任何限速处理,现有应用就是尽可能地去消费很多数据(依赖线程池异步处理 record)。 然后新 topic 由于数据量远远大于存量 topic 的数据量,如果不作消费限制的话,对于后续的业务处理是有着极大的压力和风险的。
    Tongwin
        8
    Tongwin  
    OP
       351 天前
    @wqhui 没错,就是大佬你说的意思。目前我考虑到通过使用限流器来限定特定的 consumer 1 秒只 poll 一次
    ZZ74
        9
    ZZ74  
       351 天前
    自己写代码调用 poll ,可以定时,也可以用 future 控制上一批消费完了再 poll ,注意 rebalance

    如果你要控制所有消费者和对应 topic 一起的单批次数量,那就麻烦了。
    Tongwin
        10
    Tongwin  
    OP
       351 天前
    @ZZ74 谢谢大佬提供思路。我的想法是,只需要计算好 topic 对应的分区,多实例消费 topic 的时候,只有获取到分区的实例才能消费到数据。计算一下分区数*限流量应该就可以得到想要的结果了吧?
    diagnostics
        11
    diagnostics  
       351 天前
    @Tongwin #7 RateLimiter 和 sleep 没区别,重点不是 sleep 而是限流思路,sleep 只是让出 CPU 时间片
    iX8NEGGn
        12
    iX8NEGGn  
       351 天前   ❤️ 1
    限流五算法:固定窗口、滑动窗口、滑动日志、漏桶、令牌桶,总有一款适合你
    tomorrow092
        13
    tomorrow092  
       351 天前
    @wqhui

    我对 kafka 不了解, 作为头部 mq 产品,他内部没有默认的 消费者本地流控策略?自己能把自己撑死?




    我感觉上猜测这种消费流控很常见的需求,大概率上应该只需要配置参数就能实现。多看看其他配置参数。

    fetch.max.bytes:单次获取数据的最大消息数。
    max.poll.records <= 吞吐量 :单次 poll 调用返回的最大消息数,如果处理逻辑很轻量,可以适当提高该值。默认值为 500
    lsk569937453
        14
    lsk569937453  
       351 天前   ❤️ 3
    有没有可能根本不需要限流。
    kafka 本来让业务主动去拉取,就是让你在拉取的时候控制速率。

    fetch-max-wait: 10000 # poll 一次拉取的阻塞的最大时长,单位:毫秒。这里指的是阻塞拉取需要满足至少 fetch-min-size 大小的消息
    fetch-min-size: 10 # poll 一次消息拉取的最小数据量,单位:字节
    max-poll-records: 100 # poll 一次消息拉取的最大数量

    可以完全通过这三个参数控制你的消费速率,直接同步消费就是最好的选择。你却本末倒置,做异步消费然后再限流。本来配置修改一下就可以的事情,你却写一堆代码,把简单的事情搞复杂。
    ZZ74
        15
    ZZ74  
       351 天前
    @Tongwin
    无法理解你设计的时候为什么会有没分配到分区的消费者...
    这和分区无关,每个 consumer 都负责 1...N 个分区,简单的就是确保每个 consumer 在上一批完成后再拉取。
    你上一次拉了 500 只 commit 了 200 条,consumer 内部也不会让你本地堆积 800 条。
    @tomorrow092
    OP 的问题很有可能是把拉取到任务提交到线程池后就 commit offset 了,对于 kafka broker 和 consumer 来说就是消费完了。
    所以就导致了线程池任务还在跑,consumer 又拉一批过来...然后满了 爆了... rebalance 了等等....
    Tongwin
        16
    Tongwin  
    OP
       351 天前
    @lsk569937453 这个项目我是从前辈那里接过来的。 目前已经在线上稳定运行一段时间。目前是不适宜在短时间内重构它。只想着在现有的情况下,特殊处理一下这个量大的 topic ,这个 topic 后续会下线掉。
    @ZZ74 其实就是应用部署在云上有多个实例,每个实例在创建的时候都会尝试去创建 consumer 获取分区。由于都是用同一个消费者组,最终也就只有 topic 分区数的实力能够获取到该 topic 的其中一个分区,我这样理解是没问题的吧?
    zhaogaz
        17
    zhaogaz  
       351 天前
    你怕 consumer 处理的时候,把下游搞弄挂了,然后就尝试去限制 mq ?

    这个头疼医头,脚疼医脚的做法感觉不太好。

    你说 [后续的业务处理是有着极大的压力和风险的。] ,那么我认为正确的做法是,改动 [后续的业务处理] 的 api 或者是类库,你加一个限流好了。。。而不是去限制 mq

    如果有一天,后续业务修改好了,谁能知道限流再 mq 上?谁又能看懂你那一坨代码?
    dd31san
        18
    dd31san  
       351 天前
    个人小白,kafka broker 似乎可以直接配置每秒字节数 Quotas 。
    直接在消费者上,控制消息条数,思路上也许可以:
    1 记录轮次 poll 开始时间 begin ,count 消息数量
    2 当 count 达到 limit 时,用 pause 方法暂停避免 rebalance
    3 手动提交 offset
    4 判断时间,超 过 1s 重置 count 和 begin ,若暂停中则调 resume 恢复
    Tongwin
        19
    Tongwin  
    OP
       351 天前 via Android
    @zhaogaz 目前是优先处理一下特殊情况,后续我们会排期在流量出口进行限流。流量入口的限流我们也是有计划排期优化的。感谢
    liuhan907
        20
    liuhan907  
       351 天前
    看起来你的消费者是一个单进程?那样的话我觉得写一个全局的令牌桶然后按请求限流那样做比较方便,kafka pull 消息的时候是可以指定最多拉多少个的。
    Tongwin
        21
    Tongwin  
    OP
       351 天前 via Android
    @dd31san 感觉思路是可行的。不过现阶段 consumer 配置是 auto.commit 自动提交偏移量的。如果改成手动提交偏移量,得重新评估影响范围了。
    Tongwin
        22
    Tongwin  
    OP
       351 天前 via Android
    @liuhan907 是的,我现在就是用令牌桶来实现 1 秒只 poll 一次,设定 poll 的最大数量来实现
    opengps
        23
    opengps  
       351 天前
    我怀疑我没看懂这个题目,为啥别人都在设法提速,而 op 却在设法限速降速呢?
    liuhan907
        24
    liuhan907  
       351 天前
    而且还可以用 redis 平滑迁移到多进程消费者,我认为是最优解了。
    ZZ74
        25
    ZZ74  
       351 天前
    @Tongwin
    设计时,同一个消费者组,一般一个消费者能分到至少一个分区。建议你看看项目代码,确定消费者和分区数量配比
    flmn
        26
    flmn  
       351 天前
    我猜你是不是想,poll 一次多拿几条数据一起处理,假设 100 ,而不是一条条拿?
    因为我看你在设置 max.poll.record 参数。
    这里面有一个小坑,光设置 max.poll.record 没用,你看一下 fetch.min.bytes 参数,设大一点试试。
    另外,fetch.max.wait.ms 也要设置合适,这样,时间到了有多少条拿多少条。这样既实现了批量,也保证一定的实时性。
    Takamine
        27
    Takamine  
       350 天前 via Android
    我很好奇你们这个多线程消费加自动提交的实现里需要提交的这个 offset 是怎么确定的,还是做了消费端幂等?
    ymz
        28
    ymz  
       350 天前
    当使用 Kafka 时,可以针对不同的 topic 设置不同的消息拉取数量。这可以通过配置 Kafka consumer 的属性来实现。在 Spring Boot 中,可以使用 @KafkaListener 注解来监听指定的 topic ,并且可以为每个 @KafkaListener 注解配置不同的消费者属性。

    要为不同的 topic 设置不同的消息拉取数量,可以按照以下步骤进行操作:

    创建不同的 Kafka consumer 配置类,每个配置类对应一个 topic 。
    在每个配置类中设置不同的消息拉取数量。
    在消费者服务中,使用 @KafkaListener 注解指定要监听的 topic ,并引用相应的 Kafka consumer 配置类。
    codedreamstar
        29
    codedreamstar  
       350 天前
    如果消费者处理的消息是无关的, 那么每个消费者消费单个消息只需要加分区和消费者数量
    如果消费者处理的消息是相关的, 也就是需要一批一起处理的(如果相关,在生产者就应该打包成一条消息), 那为什么又加个线程池并行跑..
    暂且不论是否合理, 你的 offset 是怎么提交的, 在批量用线程池的情况下?
    如果是等待这批消息处理完统一提交, 那么通过限制线程池能到达限制效果
    如果是丢到线程池中后直接提交, 直接改方案, 这个方案基本等于错误设计


    技术上通过各种方式都能实现目标, 但是真的需要在技术面解决吗?
    如果方便可以发下业务场景一起探讨一下
    codedreamstar
        30
    codedreamstar  
       350 天前
    kafka 本身就是 poll 模型, 处理速率和并发度都是预先设置的, 理论上不搞花活是不会出现需要控制消费者消费速率问题的, 极大概率是错误设计或者滥用了
    XepMCWEKZ76L695l
        31
    XepMCWEKZ76L695l  
       350 天前
    用谷歌 guava 的 RateLimiter 限速即可
    XepMCWEKZ76L695l
        32
    XepMCWEKZ76L695l  
       350 天前
    不建议在 poll 这里做限流,很蛋疼
    Tongwin
        33
    Tongwin  
    OP
       349 天前
    @1Q1 目前我就是用谷歌 guava 的 RateLimiter 来限制指定时间最多 Poll 几次
    Tongwin
        34
    Tongwin  
    OP
       349 天前
    @flmn 你好,你提到的小坑,设置 max.poll.record 的同时是需要配合 fetch.min.bytes 使用是吧,我理解的是,如果一条数据本身不小,fetch.min.bytes 应该是有一个默认值,如果 max.poll.record*单条数据的大小 > fetch.min.bytes 默认值,实际还是按照默认值可获取的数量来获取吧。
    Tongwin
        35
    Tongwin  
    OP
       349 天前
    @Takamine 应用里并不需要严格关注 kafka 自动提交 offset 与处理完 records 的数目一致。 目前设置的 auto.commit.interval.ms 是 1 秒,而且应用也有手动每秒往 redis 里写入当前读写的 offset 。
    Tongwin
        36
    Tongwin  
    OP
       349 天前
    @ymz 感谢大佬提供 springboot 注解的思路,目前应用并不是依赖 springboot 框架搭建的,但后续是有升级到 springboot 框架的需求的。后续在应用需要迁移重构的时候,我会着重构思注解的可行性可实现方式。
    Tongwin
        37
    Tongwin  
    OP
       349 天前
    @codedreamstar 你好大佬,应用本身设计就是为了尽可能多消费来使用多线程实现的。 目前多线程主要是用来处理数据,且消费者处理的消息是无关的,提到 offset 提交,其实在 poll 到数据后,就先手动把 Offset 保存到 redis 里,然后配置 auto.commit.interval.ms=1 秒去自动提供 offset ,拿到的数据是直接丢到多线程里去异步处理了,应用不需要关注到当前批次的 records 处理完后才更新 offset 。这一点并不是很关注,主要是后续应用处理数据的时候会有各种机制把数据丢到 redis 里,成功的失败的处理都丢到 redis 里。
    Tongwin
        38
    Tongwin  
    OP
       349 天前
    @codedreamstar 我大概讲一下场景出来吧。 应用 A 设计之初并没考虑到那么长远,初衷也是能消费多快就消费多快。因此就用上了多线程异步处理数据。 处理数据这块其实也只是为了把数据存到 redis 里。 然后我们有另外的进程去从 redis 的队列里拿到数据,然后把这些数据再下发到下游(通过调用下游接口,简称应用 B )。 目前消费的 topic 都是推过来的实时数据,因此各项的 tps 都能够满足;不过应用 B 是有一个峰值的 tps 的。之前来了个需求,新接入一个 topic (简称(topic-new),topic-new 推过来的量是固定的,我这边撑这块业务为:存量初始化。 之前协商好上游提供 topic 过来的时候是控制速率的(因此原本我这边不用考虑限速限流的),后来因沟通问题上游又不作限速处理,最终限速操作只能在应用 A 这边进行。
    针对限速这块其实我是有过几个思考方案的
    方案一:直接搞一个 Spark 应用来进行存量初始化,Spark 在控制批量消费还是很好控制的
    方案二:使用令牌桶对应用 A 特殊的 Consumer 进行限流
    方案三:对应用 A 的流入和流出都作限流操作(后续一定会排期对数据流出作限流操作,但是听各位大佬的建议,好像并不推荐对流入数据也作限流操作)
    综合考虑各种因素,目前是考虑使用方案二进行限流操作,当完成存量初始化之后就可以下线该 topic 了,后续先实现流出的限流功能,其他功能再考虑可行性。
    Tongwin
        39
    Tongwin  
    OP
       349 天前
    @flmn 我之前可能理解错意思了, 但我还是有点疑惑, 如果我设置 max.poll.record=1000 ,fetch.min.bytes 默认值是 1 ,你说的小坑是什么场景呢? 我理解的是只要有数据就会获取, 一次 Poll 最多拿 1000 条,如果不足 1000 条就拿剩余的条数回来。
    flmn
        40
    flmn  
       349 天前
    如果 fetch.min.bytes 设的太小,即使 fetch.min.bytes 设的再大,可能有几条数据,就取回来了,达不到“搓堆儿”的效果,你可以写个程序测试一下,加深理解。
    Tongwin
        41
    Tongwin  
    OP
       349 天前
    @flmn hi 大佬,我昨天就已经在测试了,配置大概是 max.poll.record 设置为 200 ,fetch.min.bytes 使用默认值。 通过限流打日志查看,每一次 poll 都是 200 。不过是在本地单实例跑的。 我大概懂你意思了,你说的情况应该是,我在消费的同时,上游也在造数据。如果我的消费速度超过生产速度,那么确实会出现,上游推来一条,我就消费 1 条的情况。
    codedreamstar
        42
    codedreamstar  
       349 天前
    你是在自己封装类似 Spring for Kafka 这样的框架吗? 那就照着 Spring for Kafka 的设计思路抄, 按照你们的需求精简一下.

    不要在框架侧依赖这些配置以达到既定的框架逻辑, 这些配置都是业务侧来根据业务情况配置的, 你在框架层面能获取到的信息必定小于业务侧, 这些配置是给业务侧介入底层的手段.

    框架需要做的是隐藏消费者创建的细节, 消息路由到消息处理方法的细节, 获取\提交消息的细节等各种可以封装的细节, 提供给业务侧封装好的接口或者使用方法就好.

    你的设计应该是为每个消费者创建一个线程, 这个线程死循环 poll 以及 poll 之后对消息的路由以及处理, 消费完自动就该 poll, poll 之后就开始消费, 根本不存在需要限制速率的地方, 消费速度就是速率, 需要提高并发度只需要控制创建的消费者数量就行(当然要有对应的分区数量).

    我看你的文章应该是给每一个消费者配了一个线程池, 路子是错的, 先不谈速率问题, 在业务侧按分区顺序消费都已经没有办法了, 同一个分区的消息都被线程池给并行了.

    如果有我误解的地方欢迎你再回复我.
    codedreamstar
        43
    codedreamstar  
       349 天前
    我页面一直没刷新, 没看到你的回复, 继续按你的场景回复.

    你的中间进程下发 B 本身就是有限流逻辑的吧. 否则按照 A 与中间进程通过 Redis 交互本身就会造成与中间进程对 B 这个链路的生产消费速率失调.

    如果中间进程只是获取数据并下发应用 B, 不涉及对数据在加工, 直接把中间进程和 Redis 砍掉, 这个下发逻辑合并到消费者的消费逻辑.

    如果中间进程负责数据加工再下发, 那就把 Kafka 的逻辑合并到那个中间进程的应用上, 再按照上述方法.

    如果现在应用 A 和中间进程不能合并, 那么就把 Redis 砍掉, 使用同步调用 A-中-B 的方式, 中间失败重试就行, 或者 A 与中间进程不使用 Redis, 也换成 Kafka, 以免生产速率过高 Redis 爆内存, 中间进程与 B 使用上述方法, 超过 TPS 报错就重试, 中间加个等待步进或者限流器.

    最好的情况就是沟通应用 B, 让 B 提供异步接口, TPS 问题让他们内部解决, 你就可以看情况把中间没用步骤都砍掉.
    Tongwin
        44
    Tongwin  
    OP
       342 天前
    @codedreamstar 大佬真的万分抱歉,1 周后才来回复你信息。 最近杂事缠身。我们用 kafka 并不是自己封装的,也是使用正常的依赖,由于本身架构问题,所以没法使用 Spring for kafka 。不过后续我们需要重构这个项目,后面以 SpringBoot 来框架进行搭建就会用 Spring for kafka 去设计了。 尴尬的是中间进程下发 B 是没有限流逻辑,我们后面会优先开发这一块。之前的数据的实时速率都没有达到应用 B 的峰值。
    后面应用重构,确实是考虑过把 redis 砍掉,原有 redis 功能是为了保证数据不丢失(比如应用 B 处理失败,应用 A 有相关的重发机制),后续重构我的想法是砍掉 redis , 消费 topic 进而下发数据,如果应用 B 处理失败,则应用 A 把失败的数据推送到另一个 topic-C 。 应用 A 继续消费 topic-C 的数据来实现重发机制。
    感谢大佬提供的思路,让我对 kafka 以及项目设计有了进一步的认识。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1067 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 19:15 · PVG 03:15 · LAX 11:15 · JFK 14:15
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.