Prefetch count--预取数量
2018-06-23 13:26:57来源:未知 阅读 ()
一、前言
前面提到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。
二、事例
生产端:
# -*- coding: UTF-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # 声明队列,并进行队列持久化 channel.queue_declare(queue='task_queue', durable=True) # 信息内容 message = "Hello,World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=2, # 消息持久化 )) print('[x] Sent %r' % message) connection.close()
消费端:
# -*- coding: UTF-8 -*- import pika import time import random connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # 声明一个队列,并队列持久化 channel.queue_declare(queue='task_queue', durable=True) print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print('[x] Received %r' % body) time.sleep(random.randint(1, 20)) print('[x] Done') # 当消息处理完成后,主动通知rabbitmq,之后rabbitmq才会删除 # 队列中的这条消息 ch.basic_ack(delivery_tag=method.delivery_tag) # prefetch_count = 1 如果消费者中有一条消息没有 # 处理完,就不会继续给这个消费者发送新消息 channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') # 永远收下去,没有就在这卡住 channel.start_consuming()
可以运行多个consumer,让它们不断接受消息,可以看到只有当consumer中的消息处理完成了,才会接受下一个消息。
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
上一篇:scrapy框架第一章
- Prefetch count--预取数量 2018-06-18
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash