环境 pika 0.10 RabbitMQ 3.5.4, Erlang 18.0
生产者
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
queue_name="route_test"
channel.exchange_declare(exchange='logs')
channel.basic_qos(prefetch_count=1)
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
i = 0
while i < 10:
channel.basic_publish(exchange='logs',
routing_key=queue_name,
body=message,
properties=pika.BasicProperties(
delivery_mode=2
))
i += 1
print(" [x] Sent %r" % message)
connection.close()
消费者
def on_start():
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
queue_name="route_test"
channel.exchange_declare(exchange='logs')
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange='logs',
queue=queue_name)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
return channel
def callback(ch, method, properties, body):
print(" [x] %r" % body)
time.sleep(3)
while True:
try:
channel = on_start()
print(' [*] Waiting for logs. To exit press CTRL+C')
channel.start_consuming()
except:
print "connect error"
我已经在 channel 中设置了channel.basic_qos(prefetch_count=1)
,但是我如果先执行消费者,然后再执行生产者,就会一次性的把所有的消息都扔给消费者,这样,我如果手动再启动一个消费者,就无法获得还没有被执行的消息了。
请问,这是为什么,或者怎么样能够达到我想要的效果,即每次消费者都只获得一个消息,剩下的都保存在消息队列里面。
1
knktc 2016-03-30 15:12:08 +08:00 1
把 no_ack 设置为 false ,然后取一条消息 ack 一次
|
2
anexplore 2016-03-30 17:04:07 +08:00
1 楼 ok
|