转载请注明出处:http://www.he10.top/article/detail/40
python多任务视频讲解:https://www.bilibili.com/video/BV1Ev411G7i3
python3官方对多任务提供了concurrent.futures模块,利用这个模块去写多任务非常简单。基本思路就是先单任务跑通,然后用任务池的submit方法去调单任务的函数即可。
多线程没什么好说的,就算有一些特殊需求也很容易满足;由于GIL的原因,大部分场景也是鸡肋。
最近在弄的算法二次分析项目,由于算法属于计算密集型任务,因此需要用到多进程,任务是串行的,进程创建放在接收到数据流后,循环调用运行中算法任务之前。
按平时的操作思路和写法是不可行的,因为进程是单独的资源空间,想要让不同进程对同一数据操作就要定义共享全局变量。
任务调度是以类方式实现,类中定义了些列表、字典存储一些信息,算法分析过程会对列表 字典数据做增删改查操作。
为了模拟任务类中存储列表、字典等情况,将数据流换成由cid和ctime组成,意指在一定时间范围内 相邻cid中有数据产出则pass掉(例:300秒内cid为0223有产出,则cid为0222-0224的数据需pass掉,不能产出),数据流更改如下:
1 def main(): 2 for i in range(100): 3 cid = random.randint(131000000201, 131000001299) 4 ctime = time.time() 5 tasks.event_info.put({"cid": cid, "time": ctime}) 6 time.sleep(1)
多任务下需求:
1.数据流未来的量会越来越大,需要使用队列存储以保证数据不丢失。
2.每个进程调任务类处理需是调的同一个任务类(对类的所有操作都要同步)
一、数据流队列
将 tasks中的event_info改成空队列,然后generate.py中的main函数中将tasks.event_info= 改为tasks.event_info.put,如下:
tasks.__init__.py
1 from multiprocessing import Queue 2 3 4 # 定义变量存储数据流 5 event_info = Queue()
generate.py
1 def main(): 2 for i in range(100): 3 cid = random.randint(131000000201, 131000001299) 4 ctime = time.time() 5 tasks.event_info.put({"cid": cid, "time": ctime}) 6 time.sleep(1)
最后在main.py中通过进程池调用generate的main函数,如下:
1 if __name__ == '__main__': 2 process = ProcessPoolExecutor(max_workers=4) 3 process.submit(generate.main) 4 while True: 5 if not tasks.event_info.empty(): 6 print(tasks.event_info.get())
发现:tasks中的event_info并不会增加数据,generate的main函数通过加打印可以知道put生效了,因此得知主进程中判断的tasks.event_info和子进程中put的tasks.event_info不是同一个队列。
解决方法:
1.通过线程调用generate的main函数,如下:
1 if __name__ == '__main__': 2 thread = ThreadPoolExecutor(max_workers=4) 3 thread.submit(generate.main) 4 while True: 5 if not tasks.event_info.empty(): 6 print(tasks.event_info.get())
2.不用进程池调用generate.main,而直接开一个进程调用generate.main,并将tasks.event_info作为参数传递给generate.main,如下:
1 from multiprocessing import Process 2 if __name__ == '__main__': 3 p1 = Process(target=generate.main, args=(tasks.event_info, )) 4 p1.start() 5 while True: 6 if not tasks.event_info.empty(): 7 print(tasks.event_info.get())
generate.py
1 def main(q): 2 for i in range(100): 3 cid = random.randint(131000000201, 131000001299) 4 ctime = time.time() 5 q.put({"cid": cid, "time": ctime}) 6 time.sleep(1)
3.队列改为Manager().Queue(),并作为参数传给generate的main,如下:
1 if __name__ == '__main__': 2 tasks.event_info = Manager().Queue() 3 process = ProcessPoolExecutor(max_workers=4) 4 process.submit(generate.main, tasks.event_info) 5 while True: 6 if not tasks.event_info.empty(): 7 print(tasks.event_info.get())
结论:
1.子进程使用的资源是主进程的一份复制资源,子进程对变量的操作不会影响主进程中的变量;子线程对变量的操作会影响主进程中的变量。
2.子进程间通信可以用multiprocessing.Queue(),进程池的子进程间通信需用multiprocessing.Manager().Queue(),都需将队列作为实参数传给子进程调用的函数内。
具体用哪种方法解看你实际使用情况,我这里是用flask_apscheduler任务调度框架调用generate.main生成数据流的,框架采用的多线程方式,因此这一步非常顺利。
二、任务类管理
先讲任务类,模拟一个算法逻辑,300S内如有相邻点位(自己加前后两点位)有数据产出,就pass掉不产出,则算法逻辑如下:
1 from demo_data_strem import tasks 2 from .eval_func import add_job_attr 3 4 @add_job_attr(job="123") 5 class Pass1(): 6 7 _pass_dict = {} 8 _max_time_interval = 5 * 60 9 10 def _update_pass_dict(self, cid, ctime): 11 for _cid in [cid - 1, cid, cid + 1]: 12 self._pass_dict[_cid] = ctime 13 14 def __call__(self, event_info, *args, **kwargs): 15 cid = event_info.get("cid") 16 ctime = event_info.get("time") 17 # print("处理的cid:", cid) 18 # print(self._pass_dict) 19 print(self._pass_dict) 20 if cid in self._pass_dict.keys(): 21 if ctime - self._pass_dict.get(cid) < self._max_time_interval: 22 self._update_pass_dict(cid, ctime) 23 return 24 25 self._update_pass_dict(cid, ctime) 26 return event_info 27 28 def main(): 29 tasks.middleware.append(Pass1)
记录一下这里用了空间换时间的方法。按常理当数据进来,需判断相邻点位是否有产出过,则需写一个三次的循环,判断三个点位(前一点位、该点位、后一点位)是否在_pass_dict中,时间复杂度是3O(n)。改进点的方法可以用set集合取三个点位和_pass_dict中所有点位的交集,交集数量可以是1-3,时间复杂度是O(n)-3O(n)。我这里当数据进来,直接将三个点位数据写入_pass_dict,那么仅需判断当前点位是否在_pass_dict中即可,时间复杂度为O(n)。
方便进程池调用,将循环调用middleware __call__方法封装成handle函数,如下:
1 def handle(event_info): 2 print(tasks.middleware) 3 for middleware_cls in tasks.middleware: 4 if hasattr(middleware_cls, "job"): 5 event_info = middleware_cls()(event_info) 6 if event_info is None: 7 return 8 9 if event_info is not None: 10 print(event_info)
直接在main.py中用进程池调用handle函数,如下:
1 if __name__ == '__main__': 2 thread = ThreadPoolExecutor(max_workers=4) 3 thread.submit(generate.main) # 生成数据流 4 thread.submit(init_pass1) # 将任务类Pass1添加进中间件列表 5 6 process = ProcessPoolExecutor(max_workers=4) 7 8 while True: 9 if not tasks.event_info.empty(): 10 event_info = tasks.event_info.get() 11 process.submit(handle, event_info)
结果:
1 {'cid': 131000000617, 'time': 1632192265.173249} 2 {'cid': 131000000365, 'time': 1632192266.1746938} 3 {'cid': 131000001195, 'time': 1632192267.177981} 4 {'cid': 131000000487, 'time': 1632192268.1818829} 5 {'cid': 131000000569, 'time': 1632192269.1828969} 6 {'cid': 131000000520, 'time': 1632192270.1834471} 7 {'cid': 131000001229, 'time': 1632192271.186166} 8 {'cid': 131000000696, 'time': 1632192272.1894841}
发现:线程调用的将任务类添加进中间件类列表中,在子进程内获取的中间件列表还是空的
解决方法:
翻文档知道可以用multiprocessing.managers.BaseManager()来创建共享全局对象,任务类对象应为唯一的,所以定义了一个字典存储这些任务类对象,字典格式:{任务类名称:任务类对象},如下:
1 if __name__ == '__main__': 2 thread = ThreadPoolExecutor(max_workers=4) 3 thread.submit(generate.main) # 生成数据流 4 thread.submit(init_pass1) # 将任务类Pass1添加进中间件列表 5 6 process = ProcessPoolExecutor(max_workers=4) 7 8 # 定义字典存储共享全局对象(直接存储运行中的任务类对象) 9 run_middleware_obj_dict = {} 10 while True: 11 if not tasks.event_info.empty(): 12 event_info = tasks.event_info.get() 13 for middleware in tasks.middleware: 14 if hasattr(middleware, "job"): 15 if middleware.__name__ not in run_middleware_obj_dict.keys(): 16 manager = BaseManager() 17 manager.register(middleware.__name__, middleware) 18 manager.start() 19 middleware_obj = getattr(manager, middleware.__name__)() 20 run_middleware_obj_dict[middleware.__name__] = middleware_obj 21 else: 22 # 注意:当任务不在运行,应该将任务类对象从run_middleware_obj_dict中删除 23 if middleware.__name__ in run_middleware_obj_dict.keys(): 24 del run_middleware_obj_dict[middleware.__name__] 25 26 # handle(event_info, run_middleware_obj_dict) 27 process.submit(handle, event_info, run_middleware_obj_dict)
由于经manager.register注册的任务类被封装了一层,已经失去原任务类的__call__方法了,所以将__call__方法改为handle,如下:
1 @add_job_attr(job="123") 2 class Pass1(): 3 4 _pass_dict = {} 5 _max_time_interval = 5 * 60 6 7 def _update_pass_dict(self, cid, ctime): 8 for _cid in [cid - 1, cid, cid + 1]: 9 self._pass_dict[_cid] = ctime 10 11 def handle(self, event_info, *args, **kwargs): 12 cid = event_info.get("cid") 13 ctime = event_info.get("time") 14 # print("处理的cid:", cid) 15 # print(self._pass_dict) 16 print(self._pass_dict) 17 if cid in self._pass_dict.keys(): 18 if ctime - self._pass_dict.get(cid) < self._max_time_interval: 19 self._update_pass_dict(cid, ctime) 20 return 21 22 self._update_pass_dict(cid, ctime) 23 return event_info 24 25 def main(): 26 tasks.middleware.append(Pass1)
handle函数(main.py中)修改如下:
1 def handle(event_info, run_middleware_obj_dict): 2 for middleware_obj in run_middleware_obj_dict.values(): 3 event_info = middleware_obj.handle(event_info) 4 if event_info is None: 5 break 6 7 if event_info is not None: 8 print(event_info)
结果:
1 {} 2 {'cid': 131000001139, 'time': 1632193144.410032} 3 {131000001138: 1632193144.410032, 131000001139: 1632193144.410032, 131000001140: 1632193144.410032} 4 {'cid': 131000000888, 'time': 1632193145.410145} 5 {131000001138: 1632193144.410032, 131000001139: 1632193144.410032, 131000001140: 1632193144.410032, 131000000887: 1632193145.410145, 131000000888: 1632193145.410145, 131000000889: 1632193145.410145} 6 {'cid': 131000000973, 'time': 1632193146.412236} 7 {131000001138: 1632193144.410032, 131000001139: 1632193144.410032, 131000001140: 1632193144.410032, 131000000887: 1632193145.410145, 131000000888: 1632193145.410145, 131000000889: 1632193145.410145, 131000000972: 1632193146.412236, 131000000973: 1632193146.412236, 131000000974: 1632193146.412236}
为了更好的让进程池中的子进程操作run_middleware_obj_dict,应当将run_middleware_obj_dict也定义为共享全局变量,即run_middleware_obj_dict = Manager().dict(),看文档Manager()创建出来的变量会自带锁,未验证。
附:当使用进程池调用init_pass1(将Pass1类添加到中间件列表)函数,然后直接在main.py中用进程池调用handle函数(main.py中),结果会是下面这样:
1 {} 2 {'cid': 131000000923, 'time': 1632149442.6484401} 3 {'cid': 131000000596, 'time': 1632149443.650462} 4 {131000000922: 1632149442.6484401, 131000000923: 1632149442.6484401, 131000000924: 1632149442.6484401} 5 {'cid': 131000000797, 'time': 1632149444.6532} 6 {'cid': 131000000509, 'time': 1632149445.656755} 7 {131000000922: 1632149442.6484401, 131000000923: 1632149442.6484401, 131000000924: 1632149442.6484401, 131000000796: 1632149444.6532, 131000000797: 1632149444.6532, 131000000798: 1632149444.6532} 8 {'cid': 131000000288, 'time': 1632149446.661807} 9 {'cid': 131000000763, 'time': 1632149447.6639402} 10 {131000000922: 1632149442.6484401, 131000000923: 1632149442.6484401, 131000000924: 1632149442.6484401, 131000000796: 1632149444.6532, 131000000797: 1632149444.6532, 131000000798: 1632149444.6532, 131000000287: 1632149446.661807, 131000000288: 1632149446.661807, 131000000289: 1632149446.661807}
发现:子进程可以获取到中间件列表的变动,但是会出现每两条数据仅一条被中间件类处理,猜想应该是调用init_pass1函数的子进程是知道中间件列表变动的,所以这个子进程去调用handle函数(main.py中)会被中间件类处理,而进程池中的其他子进程不知道中间件列表变动,所以不会被中间件类处理;由于flask_apscheduler框架执行任务是用子线程,这里的情况没有遇到所以没有深究,后续再补解决方案,如有了解的也可以评论区说下想法。
最后,理一下multiprocessing提供的共享全局变量吧
一、基本数据类型
1. str,int,float,bool:使用multiprocessing.Value()
2.tuple:使用multiprocessing.Array()
3.list:使用multiprocessing.Manager().list()
4.dict:使用multiprocessing.Manager().dict()
二、队列,单向通信
1.子进程间的消息队列:使用multiprocessing.Queue()
2.进程池的子进程间的消息队列:使用multiprocessing.Manager().Queue()
三、管道,双向通信:使用multiprocessing.Pipe()
使用这些共享全局变量时,都应将其作为实参传给子进程调用的函数。
======================
记一次更新
上面说的将run_middleware_obj_dict也定义为共享全局变量,middleware_obj为共享全局对象有误,multiprocessing创建出的共享全局变量是不支持套娃的,也就是run_middleware_obj_dict里面的值不能为全局共享对象,实测存一个键值对没问题,存两个就会报绑定错误。
其实使用也没必要套娃,只要保证真正传给子进程的变量是全局共享变量即可,因此这里可以将run_middleware_obj_dict定义为普通变量,middleware_obj为共享全局对象,传值的时候将字典中的所有value进行拆包,得到的所有middleware_obj传给handle函数即可。
main.py修改如下:
1 if __name__ == '__main__': 2 thread = ThreadPoolExecutor(max_workers=4) 3 thread.submit(generate.main) # 生成数据流 4 thread.submit(init_pass1) # 将任务类Pass1添加进中间件列表 5 6 process = ProcessPoolExecutor(max_workers=4) 7 8 # 定义字典存储共享全局对象(直接存储运行中的任务类对象) 9 run_middleware_obj_dict = dict() 10 11 # 进程锁,使用Manager().RLock(),可以多次申请锁 12 lock = Manager().RLock() 13 while True: 14 if not tasks.event_info.empty(): 15 event_info = tasks.event_info.get() 16 17 # 更新run_middleware_obj_dict 18 # run_middleware_obj_dict = update_run_obj_dict(run_middleware_obj_dict) 19 for middleware in tasks.middleware: 20 if hasattr(middleware, "job"): 21 if middleware.__name__ not in run_middleware_obj_dict: 22 manager = BaseManager() 23 manager.register(middleware.__name__, middleware) 24 manager.start() 25 middleware_obj = getattr(manager, middleware.__name__)() 26 run_middleware_obj_dict[middleware.__name__] = middleware_obj 27 else: 28 # 注意:当任务不在运行,应该将任务类对象从run_middleware_obj_dict中删除 29 if middleware.__name__ in run_middleware_obj_dict: 30 del run_middleware_obj_dict[middleware.__name__] 31 32 # handle(event_info, run_middleware_obj_dict) 33 # 将字典中的所有全局对象拆包传给handle函数 34 process.submit(handle, event_info, *run_middleware_obj_dict.values())
main.py中的handle函数:
1 def handle(event_info, *args): 2 for middleware_obj in args: 3 event_info = middleware_obj.handle(event_info) 4 if event_info is None: 5 break 6 7 if event_info is not None: 8 print(event_info)
题外记录:
1.要知道python运行会有一个主进程和主线程,子线程和主线程共享资源,子进程会有一份主进程copy的资源,但和主进程资源分开(非全局共享对象/变量)
2.操作数据时,多任务无论是多进程还是多线程,对数据的改动(增删)和对数据的遍历应该分任务隔开。遍历的同时数据发生增删操作会报错,可以使用deepcopy
Comments