记从与python多进程死磕后获得的收获

转载请注明出处:http://www.he10.top/article/detail/40

python多任务视频讲解:https://www.bilibili.com/video/BV1Ev411G7i3

python3官方对多任务提供了concurrent.futures模块,利用这个模块去写多任务非常简单。基本思路就是先单任务跑通,然后用任务池的submit方法去调单任务的函数即可。

多线程没什么好说的,就算有一些特殊需求也很容易满足;由于GIL的原因,大部分场景也是鸡肋。

最近在弄的算法二次分析项目,由于算法属于计算密集型任务,因此需要用到多进程,任务是串行的,进程创建放在接收到数据流后,循环调用运行中算法任务之前。

按平时的操作思路和写法是不可行的,因为进程是单独的资源空间,想要让不同进程对同一数据操作就要定义共享全局变量。

任务调度是以类方式实现,类中定义了些列表、字典存储一些信息,算法分析过程会对列表 字典数据做增删改查操作。

为了模拟任务类中存储列表、字典等情况,将数据流换成由cid和ctime组成,意指在一定时间范围内 相邻cid中有数据产出则pass掉(例:300秒内cid为0223有产出,则cid为0222-0224的数据需pass掉,不能产出),数据流更改如下:

1 def main():
2     for i in range(100):
3         cid = random.randint(131000000201, 131000001299)
4         ctime = time.time()
5         tasks.event_info.put({"cid": cid, "time": ctime})
6         time.sleep(1)

多任务下需求:

1.数据流未来的量会越来越大,需要使用队列存储以保证数据不丢失。

2.每个进程调任务类处理需是调的同一个任务类(对类的所有操作都要同步)

 

一、数据流队列

将 tasks中的event_info改成空队列,然后generate.py中的main函数中将tasks.event_info= 改为tasks.event_info.put,如下:

  tasks.__init__.py

1 from multiprocessing import Queue
2 
3 
4 # 定义变量存储数据流
5 event_info = Queue()

  generate.py

1 def main():
2     for i in range(100):
3         cid = random.randint(131000000201, 131000001299)
4         ctime = time.time()
5         tasks.event_info.put({"cid": cid, "time": ctime})
6         time.sleep(1)

最后在main.py中通过进程池调用generate的main函数,如下:

1 if __name__ == '__main__':
2     process = ProcessPoolExecutor(max_workers=4)
3     process.submit(generate.main)
4     while True:
5         if not tasks.event_info.empty():
6             print(tasks.event_info.get())

发现:tasks中的event_info并不会增加数据,generate的main函数通过加打印可以知道put生效了,因此得知主进程中判断的tasks.event_info和子进程中put的tasks.event_info不是同一个队列。

解决方法:

1.通过线程调用generate的main函数,如下:

1 if __name__ == '__main__':
2     thread = ThreadPoolExecutor(max_workers=4)
3     thread.submit(generate.main)
4     while True:
5         if not tasks.event_info.empty():
6             print(tasks.event_info.get())

2.不用进程池调用generate.main,而直接开一个进程调用generate.main,并将tasks.event_info作为参数传递给generate.main,如下:

1 from multiprocessing import Process
2 if __name__ == '__main__':
3     p1 = Process(target=generate.main, args=(tasks.event_info, ))
4     p1.start()
5     while True:
6         if not tasks.event_info.empty():
7             print(tasks.event_info.get())

  generate.py

1 def main(q):
2     for i in range(100):
3         cid = random.randint(131000000201, 131000001299)
4         ctime = time.time()
5         q.put({"cid": cid, "time": ctime})
6         time.sleep(1)

3.队列改为Manager().Queue(),并作为参数传给generate的main,如下:

1 if __name__ == '__main__':
2     tasks.event_info = Manager().Queue()
3     process = ProcessPoolExecutor(max_workers=4)
4     process.submit(generate.main, tasks.event_info)
5     while True:
6         if not tasks.event_info.empty():
7             print(tasks.event_info.get())

结论:

1.子进程使用的资源是主进程的一份复制资源,子进程对变量的操作不会影响主进程中的变量;子线程对变量的操作会影响主进程中的变量。

2.子进程间通信可以用multiprocessing.Queue(),进程池的子进程间通信需用multiprocessing.Manager().Queue(),都需将队列作为实参数传给子进程调用的函数内。

具体用哪种方法解看你实际使用情况,我这里是用flask_apscheduler任务调度框架调用generate.main生成数据流的,框架采用的多线程方式,因此这一步非常顺利。

 

二、任务类管理

 先讲任务类,模拟一个算法逻辑,300S内如有相邻点位(自己加前后两点位)有数据产出,就pass掉不产出,则算法逻辑如下:

 1 from demo_data_strem import tasks
 2 from .eval_func import add_job_attr
 3 
 4 @add_job_attr(job="123")
 5 class Pass1():
 6 
 7     _pass_dict = {}
 8     _max_time_interval = 5 * 60
 9 
10     def _update_pass_dict(self, cid, ctime):
11         for _cid in [cid - 1, cid, cid + 1]:
12             self._pass_dict[_cid] = ctime
13 
14     def __call__(self, event_info, *args, **kwargs):
15         cid = event_info.get("cid")
16         ctime = event_info.get("time")
17         # print("处理的cid:", cid)
18         # print(self._pass_dict)
19         print(self._pass_dict)
20         if cid in self._pass_dict.keys():
21             if ctime - self._pass_dict.get(cid) < self._max_time_interval:
22                 self._update_pass_dict(cid, ctime)
23                 return
24 
25         self._update_pass_dict(cid, ctime)
26         return event_info
27 
28 def main():
29     tasks.middleware.append(Pass1)

 记录一下这里用了空间换时间的方法。按常理当数据进来,需判断相邻点位是否有产出过,则需写一个三次的循环,判断三个点位(前一点位、该点位、后一点位)是否在_pass_dict中,时间复杂度是3O(n)。改进点的方法可以用set集合取三个点位和_pass_dict中所有点位的交集,交集数量可以是1-3,时间复杂度是O(n)-3O(n)。我这里当数据进来,直接将三个点位数据写入_pass_dict,那么仅需判断当前点位是否在_pass_dict中即可,时间复杂度为O(n)。

方便进程池调用,将循环调用middleware __call__方法封装成handle函数,如下:

 1 def handle(event_info):
 2     print(tasks.middleware)
 3     for middleware_cls in tasks.middleware:
 4         if hasattr(middleware_cls, "job"):
 5             event_info = middleware_cls()(event_info)
 6             if event_info is None:
 7                 return
 8 
 9     if event_info is not None:
10         print(event_info)

直接在main.py中用进程池调用handle函数,如下:

 1 if __name__ == '__main__':
 2     thread = ThreadPoolExecutor(max_workers=4)
 3     thread.submit(generate.main)    # 生成数据流
 4     thread.submit(init_pass1)  # 将任务类Pass1添加进中间件列表
 5 
 6     process = ProcessPoolExecutor(max_workers=4)
 7 
 8     while True:
 9         if not tasks.event_info.empty():
10             event_info = tasks.event_info.get()
11             process.submit(handle, event_info)

结果:

1 {'cid': 131000000617, 'time': 1632192265.173249}
2 {'cid': 131000000365, 'time': 1632192266.1746938}
3 {'cid': 131000001195, 'time': 1632192267.177981}
4 {'cid': 131000000487, 'time': 1632192268.1818829}
5 {'cid': 131000000569, 'time': 1632192269.1828969}
6 {'cid': 131000000520, 'time': 1632192270.1834471}
7 {'cid': 131000001229, 'time': 1632192271.186166}
8 {'cid': 131000000696, 'time': 1632192272.1894841}

发现:线程调用的将任务类添加进中间件类列表中,在子进程内获取的中间件列表还是空的

解决方法:

翻文档知道可以用multiprocessing.managers.BaseManager()来创建共享全局对象,任务类对象应为唯一的,所以定义了一个字典存储这些任务类对象,字典格式:{任务类名称:任务类对象},如下:

 1 if __name__ == '__main__':
 2     thread = ThreadPoolExecutor(max_workers=4)
 3     thread.submit(generate.main)    # 生成数据流
 4     thread.submit(init_pass1)  # 将任务类Pass1添加进中间件列表
 5 
 6     process = ProcessPoolExecutor(max_workers=4)
 7 
 8     # 定义字典存储共享全局对象(直接存储运行中的任务类对象)
 9     run_middleware_obj_dict = {}
10     while True:
11         if not tasks.event_info.empty():
12             event_info = tasks.event_info.get()
13             for middleware in tasks.middleware:
14                 if hasattr(middleware, "job"):
15                     if middleware.__name__ not in run_middleware_obj_dict.keys():
16                         manager = BaseManager()
17                         manager.register(middleware.__name__, middleware)
18                         manager.start()
19                         middleware_obj = getattr(manager, middleware.__name__)()
20                         run_middleware_obj_dict[middleware.__name__] = middleware_obj
21                 else:
22                     # 注意:当任务不在运行,应该将任务类对象从run_middleware_obj_dict中删除
23                     if middleware.__name__ in run_middleware_obj_dict.keys():
24                         del run_middleware_obj_dict[middleware.__name__]
25 
26             # handle(event_info, run_middleware_obj_dict)
27             process.submit(handle, event_info, run_middleware_obj_dict)

由于经manager.register注册的任务类被封装了一层,已经失去原任务类的__call__方法了,所以将__call__方法改为handle,如下:

 1 @add_job_attr(job="123")
 2 class Pass1():
 3 
 4     _pass_dict = {}
 5     _max_time_interval = 5 * 60
 6 
 7     def _update_pass_dict(self, cid, ctime):
 8         for _cid in [cid - 1, cid, cid + 1]:
 9             self._pass_dict[_cid] = ctime
10 
11     def handle(self, event_info, *args, **kwargs):
12         cid = event_info.get("cid")
13         ctime = event_info.get("time")
14         # print("处理的cid:", cid)
15         # print(self._pass_dict)
16         print(self._pass_dict)
17         if cid in self._pass_dict.keys():
18             if ctime - self._pass_dict.get(cid) < self._max_time_interval:
19                 self._update_pass_dict(cid, ctime)
20                 return
21 
22         self._update_pass_dict(cid, ctime)
23         return event_info
24 
25 def main():
26     tasks.middleware.append(Pass1)

  handle函数(main.py中)修改如下:

1 def handle(event_info, run_middleware_obj_dict):
2     for middleware_obj in run_middleware_obj_dict.values():
3         event_info = middleware_obj.handle(event_info)
4         if event_info is None:
5             break
6 
7     if event_info is not None:
8         print(event_info)

结果:

1 {}
2 {'cid': 131000001139, 'time': 1632193144.410032}
3 {131000001138: 1632193144.410032, 131000001139: 1632193144.410032, 131000001140: 1632193144.410032}
4 {'cid': 131000000888, 'time': 1632193145.410145}
5 {131000001138: 1632193144.410032, 131000001139: 1632193144.410032, 131000001140: 1632193144.410032, 131000000887: 1632193145.410145, 131000000888: 1632193145.410145, 131000000889: 1632193145.410145}
6 {'cid': 131000000973, 'time': 1632193146.412236}
7 {131000001138: 1632193144.410032, 131000001139: 1632193144.410032, 131000001140: 1632193144.410032, 131000000887: 1632193145.410145, 131000000888: 1632193145.410145, 131000000889: 1632193145.410145, 131000000972: 1632193146.412236, 131000000973: 1632193146.412236, 131000000974: 1632193146.412236}

为了更好的让进程池中的子进程操作run_middleware_obj_dict,应当将run_middleware_obj_dict也定义为共享全局变量,即run_middleware_obj_dict = Manager().dict(),看文档Manager()创建出来的变量会自带锁,未验证。

附:当使用进程池调用init_pass1(将Pass1类添加到中间件列表)函数,然后直接在main.py中用进程池调用handle函数(main.py中),结果会是下面这样:

 1 {}
 2 {'cid': 131000000923, 'time': 1632149442.6484401}
 3 {'cid': 131000000596, 'time': 1632149443.650462}
 4 {131000000922: 1632149442.6484401, 131000000923: 1632149442.6484401, 131000000924: 1632149442.6484401}
 5 {'cid': 131000000797, 'time': 1632149444.6532}
 6 {'cid': 131000000509, 'time': 1632149445.656755}
 7 {131000000922: 1632149442.6484401, 131000000923: 1632149442.6484401, 131000000924: 1632149442.6484401, 131000000796: 1632149444.6532, 131000000797: 1632149444.6532, 131000000798: 1632149444.6532}
 8 {'cid': 131000000288, 'time': 1632149446.661807}
 9 {'cid': 131000000763, 'time': 1632149447.6639402}
10 {131000000922: 1632149442.6484401, 131000000923: 1632149442.6484401, 131000000924: 1632149442.6484401, 131000000796: 1632149444.6532, 131000000797: 1632149444.6532, 131000000798: 1632149444.6532, 131000000287: 1632149446.661807, 131000000288: 1632149446.661807, 131000000289: 1632149446.661807}

发现:子进程可以获取到中间件列表的变动,但是会出现每两条数据仅一条被中间件类处理,猜想应该是调用init_pass1函数的子进程是知道中间件列表变动的,所以这个子进程去调用handle函数(main.py中)会被中间件类处理,而进程池中的其他子进程不知道中间件列表变动,所以不会被中间件类处理;由于flask_apscheduler框架执行任务是用子线程,这里的情况没有遇到所以没有深究,后续再补解决方案,如有了解的也可以评论区说下想法。

 

最后,理一下multiprocessing提供的共享全局变量吧

一、基本数据类型

1. str,int,float,bool:使用multiprocessing.Value()

2.tuple:使用multiprocessing.Array()

3.list:使用multiprocessing.Manager().list()

4.dict:使用multiprocessing.Manager().dict()

 

二、队列,单向通信

1.子进程间的消息队列:使用multiprocessing.Queue()

2.进程池的子进程间的消息队列:使用multiprocessing.Manager().Queue()

 

三、管道,双向通信:使用multiprocessing.Pipe()

使用这些共享全局变量时,都应将其作为实参传给子进程调用的函数。

 

 

======================

 

记一次更新

 

上面说的将run_middleware_obj_dict也定义为共享全局变量,middleware_obj为共享全局对象有误,multiprocessing创建出的共享全局变量是不支持套娃的,也就是run_middleware_obj_dict里面的值不能为全局共享对象,实测存一个键值对没问题,存两个就会报绑定错误。

其实使用也没必要套娃,只要保证真正传给子进程的变量是全局共享变量即可,因此这里可以将run_middleware_obj_dict定义为普通变量,middleware_obj为共享全局对象,传值的时候将字典中的所有value进行拆包,得到的所有middleware_obj传给handle函数即可。

  main.py修改如下:

 1 if __name__ == '__main__':
 2     thread = ThreadPoolExecutor(max_workers=4)
 3     thread.submit(generate.main)    # 生成数据流
 4     thread.submit(init_pass1)  # 将任务类Pass1添加进中间件列表
 5 
 6     process = ProcessPoolExecutor(max_workers=4)
 7 
 8     # 定义字典存储共享全局对象(直接存储运行中的任务类对象)
 9     run_middleware_obj_dict = dict()
10 
11     # 进程锁,使用Manager().RLock(),可以多次申请锁
12     lock = Manager().RLock()
13     while True:
14         if not tasks.event_info.empty():
15             event_info = tasks.event_info.get()
16 
17             # 更新run_middleware_obj_dict
18             # run_middleware_obj_dict = update_run_obj_dict(run_middleware_obj_dict)
19             for middleware in tasks.middleware:
20                 if hasattr(middleware, "job"):
21                     if middleware.__name__ not in run_middleware_obj_dict:
22                         manager = BaseManager()
23                         manager.register(middleware.__name__, middleware)
24                         manager.start()
25                         middleware_obj = getattr(manager, middleware.__name__)()
26                         run_middleware_obj_dict[middleware.__name__] = middleware_obj
27                 else:
28                     # 注意:当任务不在运行,应该将任务类对象从run_middleware_obj_dict中删除
29                     if middleware.__name__ in run_middleware_obj_dict:
30                         del run_middleware_obj_dict[middleware.__name__]
31 
32             # handle(event_info, run_middleware_obj_dict)
33             # 将字典中的所有全局对象拆包传给handle函数
34             process.submit(handle, event_info, *run_middleware_obj_dict.values())

  main.py中的handle函数:

1 def handle(event_info, *args):
2     for middleware_obj in args:
3         event_info = middleware_obj.handle(event_info)
4         if event_info is None:
5             break
6 
7     if event_info is not None:
8         print(event_info)

 

题外记录:

1.要知道python运行会有一个主进程和主线程,子线程和主线程共享资源,子进程会有一份主进程copy的资源,但和主进程资源分开(非全局共享对象/变量)

2.操作数据时,多任务无论是多进程还是多线程,对数据的改动(增删)和对数据的遍历应该分任务隔开。遍历的同时数据发生增删操作会报错,可以使用deepcopy

 


  • 作者:合十
  • 发表时间:2021年9月21日 11:51
  • 更新时间:2022年8月18日 22:12
  • 所属分类:我用Python

Comments

该文章还未收到评论,点击下方评论框开始评论吧~