1
maocat 2023-10-17 20:33:37 +08:00
消息 ID ,加日志
|
2
totoro52 2023-10-17 20:36:58 +08:00
消费确认
|
3
totoro52 2023-10-17 20:39:14 +08:00
改为手动确认模式,当消费者消费完成后手动触发确认,MQ 才会删除这条消息。
|
4
hidemyself 2023-10-17 21:07:57 +08:00
后处理呗,存本地表,定时任务扫描
|
5
oneisall8955 2023-10-18 00:22:51 +08:00 via Android
手动 ACK
|
6
tramm 2023-10-18 08:11:45 +08:00
那就不要异步呗...
手动提交的话,那就得等结果出来,跟同步处理没啥区别吧 |
7
xiaofan2 2023-10-18 08:45:36 +08:00
很明显你是没有开启手动提交 offset?
正常来说,你消息扔到线程池后应该返回一个 Future ,然后你需要等待你这个消费处理完成,再给 broker 返回提交成功 |
8
apisces 2023-10-18 08:52:47 +08:00
有个疑问,如果手动 ack 的话,那也需要等到线程池的 Future 返回才提交,这样和同步的区别在哪里呢
|
10
WashFreshFresh 2023-10-18 09:46:52 +08:00
@NoKey 处理完后手动 ack 比较好 没有 ack 的消息是会重发的
|
11
lx0319 2023-10-18 10:00:21 +08:00
去重机制。 或者设计容忍重复的方式,用于重复消费时,保持幂等
可以处理后记录 offset 和 partition ,真出现问题,从故障 offset 重取就好 kafka 重置 offset 也可以,另开一个 group ,单独消费这条数据也行。 |
12
venomD 2023-10-18 10:07:03 +08:00 1
1. 弄个死信队列,处理异常的直接丢到死信队列里
2. 丢到数据库里,mysql ,redis ?定时任务扫描处理,成功后删除 |
13
fengpan567 2023-10-18 10:19:11 +08:00
线程池异步处理有问题,消息如果在线程池的队列中,重启服务的时候数据不就丢了。还不如写到中间表,定时去捞未处理数据
|
14
lsk569937453 2023-10-18 10:27:53 +08:00
@fengpan567 重启服务的时候内存的数据是没有了,但是因为消息没有 commit ,所以可以从 mq 再次消费没有 commit 的消息。
|
16
julyclyde 2023-10-18 12:55:03 +08:00
消费和处理分开,这设计是不是有点问题啊?
是不是应该改成各线程分别消费并就地处理? |
17
jfds 2023-10-18 13:02:07 +08:00
不用线程池消息会堆积么,干嘛要增加消费速度。MQ 已经是异步的了,一般对 rt 不敏感,为了这一点速度搞异步链路引出新的问题不值得。
|
18
zhhmax 2023-10-18 13:02:32 +08:00 via iPad
办法很多,比如说消息 ID 放到 redis ,处理成功就去更新一下 redis 这个消息的自动过期时间等自动过期就行,一直没过期的就是处理失败的,如何确定一直失败呢,消费一次计数器加一即可。这样也不影响其他线程消费队列导致阻塞。失败到一定次数还可以加入其他逻辑人工干预。
|
19
Luckyshot 2023-10-18 14:03:26 +08:00
消息表+定时任务
|
20
rainbowStay 2023-10-18 14:17:59 +08:00
@zhhmax 即使消息 ID 放到 redis ,那消费失败了消息耶没法重发啊?
|
21
rainbowStay 2023-10-18 14:21:29 +08:00
@totoro52 #3 提个问题,如果不手动触发确认,MQ 是会阻塞还是会继续处理后面的其他消息?
|
22
zhhmax 2023-10-18 14:48:22 +08:00 via iPad
@rainbowStay 不用重发了,因为消息已经发成功了,有 ID 就能通过其他方式拿到完整消息的,要是消息再发一遍就重复了,这里和消费已经没关系了。楼主的异步处理消息的方式已经决定了不能再有重发消息的步骤,不然处理消息的那部分逻辑还得加上消息是否是处理失败的消息,增加代码复杂度。
|
23
w4n9hu1 2023-10-18 14:57:48 +08:00 via iPhone
加个 retry queue 和 dlq? 之前总结过一篇文章 https://w4n9hu1.dev/2023/05/26/exception-handling-in-event-driven-architectures/
|
24
rainbowStay 2023-10-18 14:59:15 +08:00
@zhhmax #22 根据 ID 能怎么拿到完整消息啊。。。那我理解要么就上游保留了关联 id 的消息信息,要么就是楼主在消费信息时自己保存一遍关联 id 的消息信息,这样不是更加大了复杂性吗?
|
25
zhhmax 2023-10-18 15:24:34 +08:00 via iPad
@rainbowStay 我们可以讨论更严谨一点,这个 kafka 如果连接的是两个不同的系统,那么你说的无法通过 ID 得到完整消息确实是个问题,保存 ID 的时候可以做到把这条失败的完整消息再保存到其他地方而不用再考虑重发问题。如果是上游的消息生产者也是自己内部系统,只能在消息队列中才能得知完整的消息内容而无法通过 ID 再从其他途径得到同样的内容在我个人看来是属于重大的设计缺陷,即便是这样,那也可以换个方式,新建一个队列把失败的消息放进去让消费原来消息队列的任务也监听一下这个队列就可以了,那么新的问题又来了,这样做到重发了,但是如果任务一直处理失败会不会放大数据量引发其他问题,比如说某段时间某批数据永远无法处理成功而一直重发会不会影响到其他批次正常消息的处理效率问题。
|
26
liprais 2023-10-18 15:28:31 +08:00
消费成功了再去更新 offset,不成功就重试呗
你不更新 offset 永远读的是这一条就完了 |
27
rainbowStay 2023-10-18 16:23:06 +08:00
给楼主找了下 Kafka 的文档: https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html ,直接搜”consume a batch of records“定位到关键部分
|
28
fkdog 2023-10-18 16:32:27 +08:00
假设前提是,你的 kafka topic 允许乱序消费。
1. 一次从 kafka 里取 n 条数据,比如 100 条。 2. 然后 100 条丢异步处理。手动 ack 从第一条开始消息到第一条失败消息区间里最大的那条 offset 作为 ack 提交。比如 1 ~ 33 是成功的,34 是失败的,那么 ack 33. 3. 消费端做幂等处理。防止消息重复消费。 4. 对于重复发生错误的消息,做 retry n 次还是失败的话,单独记录人工介入。 |