V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
996635
V2EX  ›  Kafka

pykafka 写消息时速度极慢,只有 10 mgs / s 正常么?

  •  
  •   996635 · 2016-09-07 14:17:24 +08:00 · 7777 次点击
    这是一个创建于 3031 天前的主题,其中的信息可能已经有所发展或是发生改变。
    # coding:utf=8
    
    from pykafka.client import KafkaClient
    import logging
    import json
    import time
    
    logging.basicConfig(level= logging.WARNING)
    
    produce_logger = logging.getLogger('prodrcer')
    
    
    def kafka(use_rdkafka=False):
        client = KafkaClient('192.168.109.58:9092,192.168.109.70:9092,192.168.109.91:9092')
    
        produce_start = time.time()
        topic = client.topics['meteor_spider_article_dev']
    
        # producer = topic.get_producer(sync=True, use_rdkafka=use_rdkafka)
    
        msg_body = {
            'article_id': 1,
            "title": "标题",
            "subtitle": "副标题",
        }
        msg = json.dumps(msg_body)
        with topic.get_sync_producer() as producer:
            for i in range(0, 1000):
                producer.produce(msg)
    
        producer.stop()
    
        return time.time() - produce_start
    
    
    def calculate_thoughput(timing, n_messages=1000, msg_size=5956):
        print("Processed {0} messsages in {1:.2f} seconds".format(n_messages, timing))
        print("{0:.2f} MB/s".format((msg_size * n_messages) / timing / (1024*1024)))
        print("{0:.2f} Msgs/s".format(n_messages / timing))
    
    if __name__ == '__main__':
        calculate_thoughput(kafka())
    
    

    Processed 1000 messsages in 76.68 seconds 0.07 MB/s 13.04 Msgs/s

    这速度 怎么回事?

    第 1 条附言  ·  2016-09-07 15:55:10 +08:00
    问题解决了, 在未使用 use_rdkafka 时, 默认的 linger_ms 很大, 会导致未在 min_queued_messages 内的 去做 sleep
    5 条回复    2016-09-07 15:50:51 +08:00
    sylecn
        1
    sylecn  
       2016-09-07 14:41:58 +08:00
    topic.get_sync_producer()

    虽然还没有用过 kafka ,但是这种压力测试应该都用 async 模式来发消息吧。如果用同步,起码开多线程一起发。
    要不然一个一个等反馈多慢啊。
    reAsOn
        2
    reAsOn  
       2016-09-07 14:47:30 +08:00
    用过 kafka-python 的库,性能可以接受,需要用异步发送 + batch
    996635
        3
    996635  
    OP
       2016-09-07 14:50:18 +08:00
    @sylecn
    @reAsOn

    我 python -m cProfile 运行了一下,发现居然会有 time.sleep 在调用栈里

    ```
    4064 0.002 0.000 0.002 0.000 {thread.get_ident}
    8 0.000 0.000 0.000 0.000 {thread.start_new_thread}
    6615 68.400 0.010 68.400 0.010 {time.sleep}
    8616 0.006 0.000 0.006 0.000 {time.time}

    ```
    est
        4
    est  
       2016-09-07 15:13:35 +08:00
    kafka 至少有 3 个 py 库,各自实现都不同。需要仔细判别。
    tongle
        5
    tongle  
       2016-09-07 15:50:51 +08:00
    Using the librdkafka extension 试试这个
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5070 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 01:14 · PVG 09:14 · LAX 17:14 · JFK 20:14
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.