分布式计算--(分布式+多进程+多线程+多协程)
2018-08-05 07:51:15来源:博客园 阅读 ()
先来个最简单的例子:
把1-10000每个数求平方
服务器server:
用两个队列存储任务、结果
定义两个函数
要实现分布式得继承multiprocessing.managers.BaseManager
在主函数里multiprocessing.freeze_support()开启分布式支持
注册两个函数给客户端调用
创建管理器,设置ip地址和开启端口、链接密码。
用两个队列加任务、收结果。用刚刚注册的函数
把1-10000压入队列,
把结果压入队列
最后完成关闭服务器
客户端client:
也需要继承multiprocessing.managers.BaseManager
定义一个协程处理一个数据,同时把结果压入结果队列
定义一个线程处理10个数据,开启10个协程
定义一个进程,进程驱动10个线程
主函数:同客户端注册两个函数
同客户端创建管理器,设置ip地址和开启端口、链接密码。
链接服务器
同客户端调用注册的函数,两个队列
套四层循环:10个进程、100个线程、1000个协程
循环进程函数
上代码:
服务器server:
#coding:utf-8 import multiprocessing #分布式进程 import multiprocessing.managers #分布式进程管理器 import random,time #随机数,时间 import Queue #队列 task_queue=Queue.Queue() #任务 result_queue=Queue.Queue() #结果 def return_task(): #返回任务队列 return task_queue def return_result(): #返回结果队列 return result_queue class QueueManger(multiprocessing.managers.BaseManager):#继承,进程管理共享数据 pass if __name__=="__main__": multiprocessing.freeze_support()#开启分布式支持 QueueManger.register("get_task",callable=return_task)#注册函数给客户端调用 QueueManger.register("get_result", callable=return_result) manger=QueueManger(address=("192.168.112.11",8848),authkey="123456") #创建一个管理器,设置地址与密码 manger.start() #开启 task,result=manger.get_task(),manger.get_result() #任务,结果 for i in range(10000): print "task add data",i task.put(i) print "waitting for------" for i in range(10000): res=result.get(timeout=100) print "get data",res manger.shutdown()#关闭服务器
客户端client:
#coding:utf-8 import multiprocessing #分布式进程 import multiprocessing.managers # 分布式进程管理器 import random,time #随机数,时间 import Queue #队列 import threading import gevent import gevent.monkey class QueueManger(multiprocessing.managers.BaseManager):# 继承,进程管理共享数据 pass def gevetygo(num ,result): #协程处理一个数据 print num*num result.put(num*num) def threadgo(datalist,result): # 线程处理10个数据,开启10个协程 tasklist=[] for data in datalist: tasklist.append(gevent.spawn(gevetygo, data,result)) gevent.joinall(tasklist) def processgo(ddatalist,result): # [[1,2,3],[4,5,6]] 进程驱动了10个线程 threadlist=[] for datalist in ddatalist: mythread=threading.Thread(target=threadgo,args=(datalist,result)) mythread.start() threadlist.append(mythread) for mythread in threadlist: mythread.join() if __name__=="__main__": QueueManger.register("get_task") # 注册函数调用服务器 QueueManger.register("get_result") manger=QueueManger(address=("192.168.112.11",8848),authkey="123456") manger.connect() # 链接服务器 task= manger.get_task() result =manger.get_result() # 任务,结果 # 1000 # 10个进程 # 100个线程 # 1000个协程 for i in range(10): cubelist = [] # [[[1],[2]]] for j in range(10): arealist = [] for k in range(10): linelist = [] for l in range(10): data = task.get() linelist.append(data) arealist.append(linelist) cubelist.append(arealist) processlist = [] for myarealist in cubelist: process = multiprocessing.Process(target=processgo, args=(myarealist, result)) process.start() processlist.append(process) for process in processlist: process.join()
遇到的坑:一个月之前弄分布式的时候写ip地址怎么都开启不了,后来换了台电脑就支持了= =。
如果只是在自己电脑上弄的话,写127.0.0.1也可以运行,如果你也遇到ip地址怎么都开启不了的情况
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
- python3基础之“术语表(2)” 2019-08-13
- Python连载30-多线程之进程&线程&线程使用 2019-08-13
- python_公共方法 2019-08-13
- python多线程同步实例分析 2019-08-13
- xpath+多进程爬取八零电子书百合之恋分类下所有小说。 2019-08-13
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash