转载请注明出处:http://www.he10.top/article/detail/38
随便写一个项目甚至工具包,都不会是仅用一个py文件去实现的,这样代码耦合性非常高。会有多个模块之间连调实现,项目中还会还会用到一个名次:工厂(工程)化
最近在搞一个任务的管控系统,叫算法二次分析。背景是将算法一次分析产出的数据,进行二次分析,包括去重、进一步误报筛选、发现疑似事件等。
大概环节是这样的:
将原始数据作为数据流,按顺序传给下面一个一个任务,需求:
1.每个任务都可以run,也可以不run,不run则直接交给后面一个任务
2.每个任务都可以决定这条数据是否需要产出,不产出则不往下进行,只有最后一个任务也决定需要产出时才需要产出
3.每个任务从关闭到开始,都取最新的数据流
4.没有任务时,不需要产出
有点类似django的中间件原理,任务对应各中间件,请求来临(数据流过来)后,按顺序去执行每个中间件中的对应函数,不过django的中间件是静态注册的,这里需要动态创建。
写一个工具包做简单演练:
工具包组成:
init.py(存储数据流和中间件列表):
1 # 定义列表存储任务类 2 middleware = [] 3 4 # 定义变量存储数据流 5 event_info = {}
eval_func.py(定义了一个装饰器,给任务类装饰,这里只是简单给类加了一个job属性,值任意,实际是需要将类对应的任务对象作为值)
1 def add_job_attr(**kwargs): 2 def wrapper(cls): 3 for key, value in kwargs.items(): 4 setattr(cls, key, value) 5 return cls 6 7 return wrapper
generate.py(更新数据流中数据为最新数据)
1 from demo_data_strem import tasks 2 from time import sleep 3 4 def main(): 5 for i in range(100): 6 tasks.event_info = {"cid":i} 7 sleep(1)
pass_func1.py(任务1,简单判断任务流中的value整除2,则通过)
1 from demo_data_strem import tasks 2 from .eval_func import add_job_attr 3 4 @add_job_attr(job="123") 5 class Pass1(): 6 7 def __call__(self, event_info, *args, **kwargs): 8 for key, value in event_info.items(): 9 if value % 2 == 0: 10 return event_info 11 12 13 def main(): 14 tasks.middleware.append(Pass1)
pass_func2.py(任务2,简单判断任务流中的value整除3,则通过)
1 from demo_data_strem import tasks 2 from .eval_func import add_job_attr 3 4 @add_job_attr(job="123") 5 class Pass2(): 6 7 def __call__(self, event_info, *args, **kwargs): 8 for key, value in event_info.items(): 9 if value % 3 == 0: 10 return event_info 11 12 def main(): 13 tasks.middleware.append(Pass2)
main.py(项目主函数)
1 from demo_data_strem import tasks 2 from demo_data_strem.tasks import generate, pass_fucn1, pass_func2 3 from concurrent.futures import ThreadPoolExecutor 4 import time 5 6 # 已分析任务流,用于判断数据流中数据是否是最新数据 7 pass_event_info = {} 8 9 thread = ThreadPoolExecutor(max_workers=4) 10 11 def init_pass1(): 12 # 1秒后添加任务1 13 time.sleep(1) 14 pass_fucn1.main() 15 16 def init_pass2(): 17 # 10秒后添加任务2 18 time.sleep(10) 19 pass_func2.main() 20 21 def remove_pass1(): 22 # 20秒后删除任务1 23 time.sleep(20) 24 tasks.middleware.remove(pass_fucn1.Pass1) 25 26 thread.submit(generate.main) 27 thread.submit(init_pass1) 28 thread.submit(init_pass2) 29 thread.submit(remove_pass1) 30 31 while True: 32 # 任务流中有数据,且任务类列表中有任务类,且数据流为最新 33 if tasks.event_info and tasks.middleware and (not pass_event_info or pass_event_info != tasks.event_info): 34 # 发现数据流可分析时,让已分析任务流等于当前任务流 35 pass_event_info = tasks.event_info 36 # 定义最终产出数据 37 event_info = tasks.event_info 38 for middleware_cls in tasks.middleware: 39 # 这里简单判断任务类是否含job属性,实际的复杂一些,实际需判断对应的job任务状态是否为running,逻辑是一样的 40 if hasattr(middleware_cls, "job"): 41 event_info = middleware_cls()(tasks.event_info) 42 if event_info is None: 43 break 44 else: 45 continue 46 47 if event_info is not None: 48 print(event_info)
讲解:
我这里每秒将数据流中数据更新为{"cid":time},1秒后任务1启动,10秒后任务2启动,20秒后删除任务1,那么结果将是:1秒后cid的值需整初2,则2、4、6、8打印出来;10秒后cid的值需整除6,那么12、18打印出来;20秒后cid的值需整除3,那么21、24、27。。。打印出来
结果:
1 python main.py 2 3 {'cid': 2} 4 {'cid': 4} 5 {'cid': 6} 6 {'cid': 8} 7 {'cid': 12} 8 {'cid': 18} 9 {'cid': 21} 10 {'cid': 24} 11 {'cid': 27} 12 {'cid': 30}
Done
---------------
讲到最后,其实python的导入很坑的,比如上面的其他模块导入middleware和event_info,如果用from demo_data_strem.tasks import middleware, event_info时,重新赋值不会生效,append或update会生效
分析如下:
Comments