python 之 并发编程(生产者消费者模型、守护进…
2019-07-24 09:29:00来源:博客园 阅读 ()
9.8 生产者消费者模型
该模型中包含两类重要的角色:
1、生产者:将负责造数据的任务比喻为生产者 2、消费者:接收生产者造出的数据来做进一步的处理的被比喻成消费者
实现生产者消费者模型三要素:1、生产者 2、消费者 3、队列
什么时候用该模型:
程序中出现明显的两类任何,一类任务是负责生产,另外一类任务是负责处理生产的数据的
该模型的好处:
1、实现了生产者与消费者解耦和
2、平衡了生产力与消费力,即生产者可以一直不停地生产,消费者可以不停地处理,因为二者不再直接沟通的,而是跟队列沟通,从而提高程序整体处理数据的速度
import time import random from multiprocessing import Process,Queue def consumer(name,q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[46m消费者===》%s 吃了 %s\033[0m' %(name,res)) ? def producer(name,q,food): for i in range(5): time.sleep(random.randint(1,2)) res='%s%s' %(food,i) q.put(res) print('\033[45m生产者者===》%s 生产了 %s\033[0m' %(name,res)) ? if __name__ == '__main__': q=Queue() #1、共享的盆 p1=Process(target=producer,args=('egon',q,'包子')) #2、生产者们 p2=Process(target=producer,args=('刘清政',q,'泔水')) p3=Process(target=producer,args=('杨军',q,'米饭')) ? c1=Process(target=consumer,args=('alex',q)) #3、消费者们 c2=Process(target=consumer,args=('xxx',q)) ? p1.start() p2.start() p3.start() c1.start() c2.start()
9.81 守护进程的应用
问题:消费者c1和c2在取空了q之后,则一直处于死循环中且卡在q.get()这一步
解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break退出死循环
import time import random from multiprocessing import Process,Queue ? def consumer(name,q): while True: res=q.get() if res is None:break time.sleep(random.randint(1,3)) print('\033[46m消费者===》%s 吃了 %s\033[0m' %(name,res)) ? def producer(name,q,food): for i in range(5): time.sleep(random.randint(1,2)) res='%s%s' %(food,i) q.put(res) print('\033[45m生产者者===》%s 生产了 %s\033[0m' %(name,res)) #q.put(None) if __name__ == '__main__': #1、共享的盆 q=Queue() #2、生产者们 p1=Process(target=producer,args=('egon',q,'包子')) p2=Process(target=producer,args=('刘清政',q,'泔水')) p3=Process(target=producer,args=('杨军',q,'米饭')) #3、消费者们 c1=Process(target=consumer,args=('alex',q)) c2=Process(target=consumer,args=('梁书东',q)) ? p1.start() p2.start() p3.start() c1.start() c2.start() ? p1.join()# 在生产者生产完毕后,往队列的末尾添加一个结束信号None p2.join() p3.join() # 有几个消费者就应该放几个结束信号 q.put(None)#队列是共享的,主进程同样可以往队列里放None q.put(None)
升级版:设置守护进程,向队列发送结束信号,解决管道取空阻塞问题
JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
-
maxsize是队列中允许最大项数,省略则无大小限制
-
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
-
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
import time import random from multiprocessing import Process,JoinableQueue ? def consumer(name,q): while True: res=q.get() if res is None:break time.sleep(random.randint(1,3)) print('\033[46m消费者===》%s 吃了 %s\033[0m' %(name,res)) q.task_done() #向q.join()发送一次信号,证明一个数据已经被取走了 ? def producer(name,q,food): .......... if __name__ == '__main__': #1、共享的盆 q=JoinableQueue() #2、生产者们 p1=Process(target=producer,args=('egon',q,'包子')) p2=Process(target=producer,args=('刘清政',q,'泔水')) p3=Process(target=producer,args=('杨军',q,'米饭')) #3、消费者们 c1=Process(target=consumer,args=('alex',q)) c2=Process(target=consumer,args=('梁书东',q)) c1.daemon=True c2.daemon=True ? p1.start() p2.start() p3.start() c1.start() c2.start() ? p1.join()# 确定生产者确确实实已经生产完毕 p2.join() p3.join() # 在生产者生产完毕后,拿到队列中元素的总个数,然后直到元素总数变为0,q.join()这一行代码才算运行完毕 q.join() #q.join()一旦结束就意味着队列确实被取空,消费者已经确确实实把数据都取干净了 print('主进程结束')
原文链接:https://www.cnblogs.com/mylu/p/11228473.html
如有疑问请与原作者联系
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
下一篇:python输出九九乘法表
- python3基础之“术语表(2)” 2019-08-13
- python3 之 字符串编码小结(Unicode、utf-8、gbk、gb2312等 2019-08-13
- Python3安装impala 2019-08-13
- 小白如何入门 Python 爬虫? 2019-08-13
- python_字符串方法 2019-08-13
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash