V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
petelin
V2EX  ›  Kafka

准备写个基于 kafka 的延迟队列, 有感兴趣的吗

  •  
  •   petelin · 2019-05-29 17:53:27 +08:00 · 10475 次点击
    这是一个创建于 2009 天前的主题,其中的信息可能已经有所发展或是发生改变。

    解决的问题: kafka 不支持延迟队列

    如何解决: 如果是延迟小时, push 之前先放到 redis 里, 然后 work 通过 lua 轮训拿到需要真的 push 到队列里的请求, 然后 push 到 kafka 里.

    整个功能其实和 Python 的 celery 或者 Go 的 machinery 很像.但是前者需要单独部署项目太复杂, 后者不支持 kafka.

    有搞头吗?

    21 条回复    2021-03-29 20:35:34 +08:00
    jaylee77
        1
    jaylee77  
       2019-05-29 18:08:16 +08:00
    没有
    Takamine
        2
    Takamine  
       2019-05-29 20:13:15 +08:00 via Android
    这样的感觉……还不如就只用 redis,从 list 里面取出来直接拿到 redis 订阅发布_(:з」∠)_。
    Varobjs
        3
    Varobjs  
       2019-05-30 08:31:39 +08:00 via Android
    beanstalk 了解下
    petelin
        4
    petelin  
    OP
       2019-05-30 09:12:28 +08:00 via iPhone
    @Takamine 性能会有问题 分布式的东西还是应该让队列做
    petelin
        5
    petelin  
    OP
       2019-05-30 09:18:25 +08:00 via iPhone
    @Varobjs 多谢 跟 sqs 很像 不过看到基于内存的 可靠性可能要打折了 还没具体看 上班研究一下
    ebingtel
        6
    ebingtel  
       2019-05-30 09:21:26 +08:00
    有个疑问,真的用得上 kafka 的场景,为了延迟 N 分钟、塞进 redis,单机内存会爆的吧?
    petelin
        7
    petelin  
    OP
       2019-05-30 09:38:35 +08:00 via iPhone
    @ebingtel 还好 Redis 也可以不是单机 分库分表嘛 具体的消息不要存进 Redis 存个 ID 就行 主要是必须解耦 容灾 可扩展 消息还最好只消费一次 后面这些要求任何一个消息队列都比现在的 Redis 实现要好

    当然做玩具或者小公司不用考虑这些
    airfling
        8
    airfling  
       2019-05-30 09:42:54 +08:00
    延迟注定会有个问题是,如果并发足够大,你存储延迟时这些数据的地方会内存爆掉
    petelin
        9
    petelin  
    OP
       2019-05-30 11:41:43 +08:00
    @airfling 不一定存内存里啊, Redis 只是为了方便, 你存 MySQL 里, 加上索引, 不也可以吗
    nicoljiang
        10
    nicoljiang  
       2019-05-30 15:14:59 +08:00
    感觉是为了用新式的方案,出了一个坑,为了埋这个坑,又要用老的方案来弥补。
    mooncakejs
        11
    mooncakejs  
       2019-05-30 16:22:32 +08:00
    kafka 的特点是高性能,搞了这玩意就没有高性能了,那为什么不用直接支持定时的队列系统呢。
    其实高性能的队列中间件往往都不支持定时,或者有限支持。
    kafka 也可以参考有限支持的,只要支持有限的定时时长,还是很简单的。
    Feedline
        12
    Feedline  
       2019-05-30 17:15:40 +08:00
    为啥不用 rabbitmq ?
    petelin
        13
    petelin  
    OP
       2019-05-30 17:34:47 +08:00 via iPhone
    @Feedline 想用 公司不支持这个
    guagusi
        14
    guagusi  
       2019-05-31 09:10:24 +08:00
    时间轮了解一下
    petelin
        15
    petelin  
    OP
       2019-05-31 13:23:05 +08:00
    @guagusi 这个只是 kafka 内部需要延迟然后搞的一个东西, 没有暴露给生产者吧
    Damnever
        16
    Damnever  
       2019-06-02 11:48:39 +08:00
    用 Redis/MySQL 加个中间层就已经能做了一个延时队列了,仅仅是为了使用 kafka 的接口?

    据说 kafka 已经支持基于 timestamp 的消费了;如果非要用困难模式也是可以的,在 topic 上做文章,对过期时间点根据需求进行分段,每个分段对应一个 topic,然后对 producer 和 consumer 搞层封装,并不觉得数据量大的情况下这么玩 kafka 能不出问题
    petelin
        17
    petelin  
    OP
       2019-06-02 12:22:17 +08:00 via iPhone
    @Damnever 中间层能支持的 QPS 肯定没有算好了直接甩给 kafk 高对吧
    Damnever
        18
    Damnever  
       2019-06-02 12:40:36 +08:00
    @petelin 丢给 kafka 性能就高是什么逻辑,性能是由短板决定的
    petelin
        19
    petelin  
    OP
       2019-06-02 14:49:40 +08:00 via iPhone
    @Damnever 比方说只用 Redis 的话 有多少个 worker 就得 poll 多少次 这对 Redis 是一个挑战 不如只开很少的 worker 只进行分发消息 消息处理丢给 kafka
    另外 kafka 等消息队列可以支持 只处理一次 或者最少处理一次
    Damnever
        20
    Damnever  
       2019-06-02 17:36:22 +08:00
    @petelin 其实我不太明白你的具体场景和限制条件,但从你的描述来看我觉得这些其实都不是问题,再者又是基于什么理由判断对 Redis 是挑战的东西但是对 Kafka 就不是挑战了呢?当然不想重新造轮子觉得成本太高将各种系统的部分功能组合变成自己想要的功能也是可以的,但不管怎样都得造点东西.. (就我个人看来最简单干净的方法是使用支持延时队列的消息系统)
    bthulu
        21
    bthulu  
       2021-03-29 20:35:34 +08:00
    可以做固定时间点的延时重发.
    比如说, 1 秒, 10 秒, 30 秒, 1 分钟, 5 分钟, 1 小时, 8 小时, 24 小时延时等.
    针对每个延时时间创建一个队列, 生产者按延时需求将数据(数据里包一层最终要去的队列名)发送到对应队列.
    然后每个队列起一个消费者, 轮询数据, 到点发送到目标队列即可.
    ```
    headers.put('finalTopic', topic);
    producer.send(new ProducerRecord(delayedTopic, key, value, headers));
    ```

    ```
    // 60 秒延时队列
    int delay = 60_000;
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
    long timeLeft = record.timestamp() + delay - System.currentTimeMillis();
    if (timeLeft > 0) {
    Thread.sleep(timeLeft);
    }
    var topic = record.headers.lastHeader('finalTopic')
    record.headers.remove('finalTopic');
    producer.send(new ProducerRecord(topic, key, value, headers));
    }
    }
    ```
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1036 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 20:11 · PVG 04:11 · LAX 12:11 · JFK 15:11
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.