从数据流讲到python多模块导入中的坑

转载请注明出处:http://www.he10.top/article/detail/38

随便写一个项目甚至工具包,都不会是仅用一个py文件去实现的,这样代码耦合性非常高。会有多个模块之间连调实现,项目中还会还会用到一个名次:工厂(工程)化

最近在搞一个任务的管控系统,叫算法二次分析。背景是将算法一次分析产出的数据,进行二次分析,包括去重、进一步误报筛选、发现疑似事件等。

大概环节是这样的:

将原始数据作为数据流,按顺序传给下面一个一个任务,需求:

1.每个任务都可以run,也可以不run,不run则直接交给后面一个任务

2.每个任务都可以决定这条数据是否需要产出,不产出则不往下进行,只有最后一个任务也决定需要产出时才需要产出

3.每个任务从关闭到开始,都取最新的数据流

4.没有任务时,不需要产出

有点类似django的中间件原理,任务对应各中间件,请求来临(数据流过来)后,按顺序去执行每个中间件中的对应函数,不过django的中间件是静态注册的,这里需要动态创建。

 写一个工具包做简单演练:

工具包组成:

 init.py(存储数据流和中间件列表):

1 # 定义列表存储任务类
2 middleware = []
3 
4 # 定义变量存储数据流
5 event_info = {}

eval_func.py(定义了一个装饰器,给任务类装饰,这里只是简单给类加了一个job属性,值任意,实际是需要将类对应的任务对象作为值)

1 def add_job_attr(**kwargs):
2     def wrapper(cls):
3         for key, value in kwargs.items():
4             setattr(cls, key, value)
5         return cls
6 
7     return wrapper

generate.py(更新数据流中数据为最新数据)

1 from demo_data_strem import tasks
2 from time import sleep
3 
4 def main():
5     for i in range(100):
6         tasks.event_info = {"cid":i}
7         sleep(1)

pass_func1.py(任务1,简单判断任务流中的value整除2,则通过)

 1 from demo_data_strem import tasks
 2 from .eval_func import add_job_attr
 3 
 4 @add_job_attr(job="123")
 5 class Pass1():
 6 
 7     def __call__(self, event_info, *args, **kwargs):
 8         for key, value in event_info.items():
 9             if value % 2 == 0:
10                 return event_info
11 
12 
13 def main():
14     tasks.middleware.append(Pass1)

pass_func2.py(任务2,简单判断任务流中的value整除3,则通过)

 1 from demo_data_strem import tasks
 2 from .eval_func import add_job_attr
 3 
 4 @add_job_attr(job="123")
 5 class Pass2():
 6 
 7     def __call__(self, event_info, *args, **kwargs):
 8         for key, value in event_info.items():
 9             if value % 3 == 0:
10                 return event_info
11 
12 def main():
13     tasks.middleware.append(Pass2)

main.py(项目主函数)

 1 from demo_data_strem import tasks
 2 from demo_data_strem.tasks import generate, pass_fucn1, pass_func2
 3 from concurrent.futures import ThreadPoolExecutor
 4 import time
 5 
 6 # 已分析任务流,用于判断数据流中数据是否是最新数据
 7 pass_event_info = {}
 8 
 9 thread = ThreadPoolExecutor(max_workers=4)
10 
11 def init_pass1():
12     # 1秒后添加任务1
13     time.sleep(1)
14     pass_fucn1.main()
15 
16 def init_pass2():
17     # 10秒后添加任务2
18     time.sleep(10)
19     pass_func2.main()
20 
21 def remove_pass1():
22     # 20秒后删除任务1
23     time.sleep(20)
24     tasks.middleware.remove(pass_fucn1.Pass1)
25 
26 thread.submit(generate.main)
27 thread.submit(init_pass1)
28 thread.submit(init_pass2)
29 thread.submit(remove_pass1)
30 
31 while True:
32     # 任务流中有数据,且任务类列表中有任务类,且数据流为最新
33     if tasks.event_info and tasks.middleware and (not pass_event_info or pass_event_info != tasks.event_info):
34         # 发现数据流可分析时,让已分析任务流等于当前任务流
35         pass_event_info = tasks.event_info
36         # 定义最终产出数据
37         event_info = tasks.event_info
38         for middleware_cls in tasks.middleware:
39             # 这里简单判断任务类是否含job属性,实际的复杂一些,实际需判断对应的job任务状态是否为running,逻辑是一样的
40             if hasattr(middleware_cls, "job"):
41                 event_info = middleware_cls()(tasks.event_info)
42                 if event_info is None:
43                     break
44             else:
45                 continue
46 
47         if event_info is not None:
48             print(event_info)

讲解:

我这里每秒将数据流中数据更新为{"cid":time},1秒后任务1启动,10秒后任务2启动,20秒后删除任务1,那么结果将是:1秒后cid的值需整初2,则2、4、6、8打印出来;10秒后cid的值需整除6,那么12、18打印出来;20秒后cid的值需整除3,那么21、24、27。。。打印出来

结果:

 1 python main.py
 2 
 3 {'cid': 2}
 4 {'cid': 4}
 5 {'cid': 6}
 6 {'cid': 8}
 7 {'cid': 12}
 8 {'cid': 18}
 9 {'cid': 21}
10 {'cid': 24}
11 {'cid': 27}
12 {'cid': 30}

Done

 

---------------

讲到最后,其实python的导入很坑的,比如上面的其他模块导入middleware和event_info,如果用from demo_data_strem.tasks import middleware, event_info时,重新赋值不会生效,append或update会生效

分析如下:

 


  • 作者:合十
  • 发表时间:2021年9月9日 02:47
  • 更新时间:2024年11月30日 09:40
  • 所属分类:我用Python

Comments

该文章还未收到评论,点击下方评论框开始评论吧~