讲讲数据流来源,项目中需求:实时获取mq数据并进行写入数据库和作为数据流传给算法分析。
mq用的阿里(现已转Apache)开源的分布式rocketmq。为此用的python模块是rocketmq-python。
使用前安装:
# 1.安装rocketmq-python pip install rocketmq # 2.安装rocketmq-client-cpp环境 wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm sudo rpm -ivh rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm # 如果运行报错【OSError: librocketmq.so: cannot open shared object file: No such file or dir】再执行下面操作 find / -name librocketmq.so ln -s 上面find找到的路径/librocketmq.so /usr/lib/ sudo ldconfig
在此之前是几乎完全不了解rocketmq的,但得益于python的友好性,给个damo就直接用上了,rocketmq-python消费有两种方式:PullConsumer 和 PushConsumer,PullConsumer为全部消费(可重复消费);PushConsumer为即时消费(不可重复消费)。最初代码大致如下:
1 from demo_data_strem import tasks 2 from rocketmq.client import PushConsumer 3 import json 4 5 class DataPersistence(): 6 7 def __init__(self, ip, consumer, topic): 8 self._topic = topic 9 self._consumer = PushConsumer(consumer) 10 self._consumer.set_namesrv_addr(ip) 11 12 def _generate(self, dataset): 13 json_data = json.load(dataset.body.strip()) 14 # 简单做个打印操作 15 print(json_data) 16 tasks.event_info.put(json_data) 17 18 19 def main(self): 20 self._consumer.subscribe(self._topic, self._generate) 21 self._consumer.start() 22 23 24 if __name__ == '__main__': 25 persistence_obj = DataPersistence("192.168.1.2", "analysis", "event") 26 while True: 27 persistence_obj.main()
使用时发现程序内存会持续增加,小概率会报MemoryError错误。
定位错误代码是在 self._consumer.subscribe(self._topic, self._generate) 这里面。起初以为这是这个模块的BUG,为了记住消费进度会保存mq中数据的引用,从而达到不重复消费目的。
该模块作者(messense)还写了另一个模块rocketmq-client-python,在README中看到给subscribe的callback函数中返回了一个类似flag的东西,以为是替换了rocketmq-python的记住消费进度方案,很有可能可以解决内存溢出问题。
于是修改如下:
1 from demo_data_strem import tasks 2 from rocketmq.client import PushConsumer, ConsumeStatus 3 import json 4 5 class DataPersistence(): 6 7 def __init__(self, ip, consumer, topic): 8 self._topic = topic 9 self._consumer = PushConsumer(consumer) 10 self._consumer.set_name_server_address(ip) 11 12 def _generate(self, dataset): 13 json_data = json.load(dataset.body.strip()) 14 # 简单做个打印操作 15 print(json_data) 16 tasks.event_info.put(json_data) 17 18 return ConsumeStatus.CONSUME_SUCCESS 19 20 21 def main(self): 22 self._consumer.subscribe(self._topic, self._generate) 23 self._consumer.start() 24 25 26 if __name__ == '__main__': 27 persistence_obj = DataPersistence("192.168.1.2", "analysis", "event") 28 while True: 29 persistence_obj.main()
使用发现没有MemoryError错误了,但是内存依然会持续增加,增加幅度相对之前低了些。当然还得继续优化。
比较难的是网上找不到该模块的任何详细介绍,也没有发现出现过类似情况的帖,在github上提问中也没有类似问题。
再仔细看README发现,作者的PushConsumer的demo中subscribe和start都没有写在循环内,写在循环内的只是一个sleep,如下:
1 import time 2 3 from rocketmq.client import PushConsumer, ConsumeStatus 4 5 6 def callback(msg): 7 print(msg.id, msg.body) 8 return ConsumeStatus.CONSUME_SUCCESS 9 10 11 consumer = PushConsumer('CID_XXX') 12 consumer.set_name_server_address('127.0.0.1:9876') 13 consumer.subscribe('YOUR-TOPIC', callback) 14 consumer.start() 15 16 while True: 17 time.sleep(3600) 18 19 consumer.shutdown()
循环的意义貌似只是让程序不终止,模拟作者demo的写法,去掉where True发现消费到几条数据后程序就终止了。得以下几条结论:
1.subscribe只是将topic和回调函数绑定,该步骤不会去消费,调用start方法后该topic消费到的数据交给回调函数处理
2.看源码回调函数会交给一个列表,猜测可以实现:一个consumer可以一个topic可以绑定多个回调函数;一个consumer可以多个topic绑定不同回调函数一起消费。还未做验证,待更新。
3.同一个topic 同一个回调函数只需绑定一次,subscribe函数会重复使用内存。
4.start也只调用一次即可,重复调用无意义。
5.调用consumer.start()后只需想办法让主进程(主线程)不终止即可持续消费。
正确使用PushConsumer消费的方法参照作者的demo即可,最终修改如下:
1 from demo_data_strem import tasks 2 from rocketmq.client import PushConsumer, ConsumeStatus 3 import json 4 import time 5 6 class DataPersistence(): 7 8 def __init__(self, ip, consumer, topic): 9 self._topic = topic 10 self._consumer = PushConsumer(consumer) 11 self._consumer.set_name_server_address(ip) 12 self._consumer.subscribe(self._topic, self._generate) 13 14 def _generate(self, dataset): 15 json_data = json.load(dataset.body.strip()) 16 # 简单做个打印操作 17 print(json_data) 18 tasks.event_info.put(json_data) 19 20 return ConsumeStatus.CONSUME_SUCCESS 21 22 def main(self): 23 self._consumer.start() 24 25 def close(self): 26 self._consumer.shutdown() 27 28 29 if __name__ == '__main__': 30 persistence_obj = DataPersistence("192.168.1.2", "analysis", "event") 31 persistence_obj.main() 32 while True: 33 time.sleep(60) 34 persistence_obj.close()
Comments