V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
NoKey
V2EX  ›  程序员

请教, kafka 如何做到一个 topic 分发不同的类型的消息

  •  
  •   NoKey · 338 天前 · 2002 次点击
    这是一个创建于 338 天前的主题,其中的信息可能已经有所发展或是发生改变。
    场景是这样的,上游服务 A ,通过 kafka 发消息个下游服务 B,C,D

    为了后续集成方便,A 使用了一个 Topic

    这个时候,需要 BCD 接收自己的消息

    这种场景下,如何才能控制 BCD 只收到自己的消息,不收别人的消息呢?

    考虑了几种方式:
    1. 通过 key 。这样下游服务只有收到消息之后才知道 key 是啥,不是自己的丢弃,但是这样必须收消息,也就是 B 会收到 C ,D 的消息,感觉不好。
    2. 通过分区。不同下游的消息放到不同的分区,但是这样会造成分区不均衡,部分分区过大。

    请问一下大家有没有更好的办法呢?谢谢
    25 条回复    2023-04-25 16:46:13 +08:00
    antipro
        1
    antipro  
       338 天前 via Android
    给 B ,C ,D 各建一个 Kafka
    aijam
        2
    aijam  
       338 天前
    1F +1
    cloudzhou
        3
    cloudzhou  
       338 天前
    给 B ,C ,D 各建一个 Topic 就可以
    dddd1919
        4
    dddd1919  
       338 天前
    如果只让 BCD 接收到自己的消息,那就在 push 时分三个 topic ,直接把消息隔离开,缺点就是负载可能不均服务利用率降低
    如果只让 BCD 处理自己要的消息并忽略掉无意义消息,可以在各自 consumer 加 filterStrategy 过滤掉无关消息
    NoKey
        5
    NoKey  
    OP
       338 天前
    建多个 topic 的麻烦点就是,后续要不断的增加 topic ,有没有办法,一个 topic 就可以解决呢?😂
    ChaYedan666
        6
    ChaYedan666  
       338 天前
    @NoKey 不可能吧,不论怎么说,只要是都发一个 topic ,那么 BCD 就得把里面的消息拉过来做过滤,过滤后再消费自己的;或者另一种就是你自己说的第二种,同一个消费者组,监听不同分区,根据 key 发不同的分区,分区不均衡啥的就你得自己控制了
    wuYin
        7
    wuYin  
       338 天前 via iPhone
    也许可以用 2 个 kafka 集群,A 写集群 1 ,自己写个 connector 做消息解析与分发,写到集群 2 的三个 topic ,再由 B C D 各自消费。
    这种做法引入了新的集群和组件,成本和维护代价更高。可行但不建议
    securityCoding
        8
    securityCoding  
       338 天前 via Android
    kafka 不好做,换阿里云 rocketmq 加 tag
    kaddusabagei38
        9
    kaddusabagei38  
       338 天前
    建议换队列
    urnoob
        10
    urnoob  
       338 天前
    B C D 各自作为一个消费者组。
    waitwait365
        11
    waitwait365  
       338 天前
    用 rabbitmq
    zgzhang
        12
    zgzhang  
       338 天前
    kafka stream 做个任务来处理
    WhereverYouGo
        13
    WhereverYouGo  
       338 天前
    在消息体里定义 business_type: B 、C 、D ,然后引进一个中间层 X ,X 直接消费 A 发送的消息,并根据 business_type 决定调用( HTTP 或 RPC ) B 、C 、D 。(计算机科学中的每个问题都可以用一间接层解决 doge )
    WhereverYouGo
        14
    WhereverYouGo  
       338 天前
    但是上述方案有个问题:B 、C 、D 直接接受流量的冲击,没有 MQ 来缓冲,服务可能会被打爆
    fkdog
        15
    fkdog  
       338 天前
    明明有现成的高速公路,多建两个 topic 的事,你非得要自己单独再修一条路。我不知道怎么评价你这个需求。。
    “为了方便”,请问改成 3 个 topic 不方便在哪里?
    awinds
        16
    awinds  
       338 天前
    除非你真的有需求有另外的 E 同时消费所有数据,不然就多个 topic 吧
    lower
        17
    lower  
       338 天前
    @WhereverYouGo 感觉问题不大,X 其实已经在 mq 后面了,慢慢一个一个取消息就行
    Super8
        18
    Super8  
       338 天前
    可以在消息的 key 或者 value 中添加标识,例如在消息的 key 中添加 B 、C 、D 等标识,表示该消息是发给 B 、C 、D 的,然后在消费者端使用带有过滤条件的消费者来消费消息,只消费自己需要的消息。具体可以使用 Kafka 的 Consumer API 提供的 subscribe 方法中的参数来实现,例如使用 subscribe(Collections.singleton(topic), new MyPartitionAssignor()) 方法,其中 MyPartitionAssignor 实现了 PartitionAssignor 接口,可以根据标识来分配分区。另外,也可以使用 Kafka Streams 来实现消息过滤和分发。
    Super8
        19
    Super8  
       338 天前
    rocketmq 中 tag 最适合这个场景
    zhaoyy0513
        20
    zhaoyy0513  
       338 天前
    @Super8 我创建的 KafkaConsumer 用到的 api 里面没有这两个参数的方法啊老哥,你说的这个 kafka 是哪个版本的啊
    zhaoyy0513
        21
    zhaoyy0513  
       338 天前
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.Produced;

    import java.util.Properties;

    public class KafkaStreamsExample {
    public static void main(String[] args) {
    // 设置 Kafka Streams 属性
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    // 创建 Kafka Streams 实例
    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> stream = builder.stream("topic-A");

    // 根据消息的 key 将消息路由到不同的分区中
    stream.selectKey((key, value) -> key)
    .through("topic-A-shuffle")
    .groupByKey()
    .foreach((key, value) -> {
    // 处理消息
    System.out.println("Processed message: " + value);
    });

    // 将处理后的消息发送到下游服务
    stream.mapValues(value -> "processed " + value)
    .to("topic-B", Produced.with(Serdes.String(), Serdes.String()));

    KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
    kafkaStreams.start();
    }
    }

    在上面的代码中,首先使用 selectKey()方法将消息的 key 作为新的 key ,然后使用 through()方法将消息发送到一个新的 Topic 中,这个新的 Topic 会使用 Kafka 默认的分区策略将消息路由到不同的分区中。然后,我们使用 groupByKey()方法将同一个 key 的消息分组,确保每个消费者只消费自己需要的消息。最后,我们使用 foreach()方法处理分组后的消息,并使用 mapValues()方法将处理后的消息发送到下游服务。

    需要注意的是,使用分流操作可能会导致数据倾斜(data skew)问题,因为某些 key 的消息可能比其他 key 的消息更频繁,从而导致某些分区比其他分区拥有更多的消息。为了解决这个问题,可以使用一些分区策略(partitioning strategy),例如随机分配、循环分配、哈希分配等。
    burymme11
        22
    burymme11  
       338 天前
    可以中间自己加一个路由层。
    新建一个中间层 AA ,来监听 topic ,处理上游服务 A 的消息,在 AA 里面,自己写代码做负载均衡,比如根据消息 ID 取模,给 B ,C ,D 分配好不同的 key ,最后所有消息再往新的 NewTopic 里丢。这样 B ,C ,D 就监听 NewTopic 就行,以后要加薪的下游服务,你只要改动 AA 层分发路由的代码就好。
    Dlin
        23
    Dlin  
       338 天前
    kafka 的 topic 和 rabbitmq 的 topic 不一样么。
    zhaoyy0513
        24
    zhaoyy0513  
       338 天前
    要实现上游系统 A 将消息发送到下游系统 B 、C 、D ,并确保每个下游系统只处理自己需要处理的消息,同时还要确保消息只被消费一次,可以采用以下方案:

    使用 Kafka 作为消息中间件,将上游系统 A 发送的消息发布到一个名为"topic-A"的 Kafka 主题中。

    在下游系统 B 、C 、D 中,创建三个不同的消费者组,分别为"group-B"、"group-C"、"group-D",并订阅"topic-A"主题。

    在消费者端,使用 Kafka 中的消息过滤器来过滤掉不需要的消息,只选择要处理的消息。可以使用 Kafka 中的消息键(key)来实现过滤。例如,下游系统 B 只想处理键(key)为"key-B"的消息,可以使用以下代码来实现:

    java
    Copy
    // 创建 Kafka 消费者
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "group-B");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

    // 订阅"topic-A"主题
    consumer.subscribe(Collections.singletonList("topic-A"));

    // 消费消息
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
    if (record.key().equals("key-B")) {
    // 处理消息
    }
    }
    consumer.commitSync();
    }
    ```

    为了确保消息只被消费一次,将消费者的 auto.offset.reset 属性设置为"earliest",并启用自动提交偏移量。这将确保消费者在启动时从最早可用的偏移量开始消费,以避免漏掉任何消息,并且将自动提交偏移量以确保每个消息只被消费一次。例如,可以使用以下代码来实现:

    java
    Copy
    // 创建 Kafka 消费者
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "group-B");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("auto.offset.reset", "earliest");
    props.put("enable.auto.commit", "true");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    ```
    使用上述方案,上游系统 A 可以将消息发送到"topic-A"主题中,下游系统 B 、C 、D 可以使用 Kafka 消费者订阅该主题,并使用消息过滤器来过滤掉不需要的消息,只选择要处理的消息。自动提交偏移量将确保每个消息只被消费一次。





    上面两条回复都是 chatgpt 回复的
    PythonYXY
        25
    PythonYXY  
       338 天前
    为什么不建多个 topic 呢,如果下游服务不固定可以做成配置式的啊
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   977 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 20:11 · PVG 04:11 · LAX 13:11 · JFK 16:11
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.