解决python操作rocketmq时的内存溢出

讲讲数据流来源,项目中需求:实时获取mq数据并进行写入数据库和作为数据流传给算法分析。

mq用的阿里(现已转Apache)开源的分布式rocketmq。为此用的python模块是rocketmq-python。

使用前安装:

# 1.安装rocketmq-python
pip install rocketmq

# 2.安装rocketmq-client-cpp环境
wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm
sudo rpm -ivh rocketmq-client-cpp-2.0.0-centos7.x86_64.rpm

# 如果运行报错【OSError: librocketmq.so: cannot open shared object file: No such file or dir】再执行下面操作
find / -name librocketmq.so
ln -s 上面find找到的路径/librocketmq.so /usr/lib/
sudo ldconfig

在此之前是几乎完全不了解rocketmq的,但得益于python的友好性,给个damo就直接用上了,rocketmq-python消费有两种方式:PullConsumer 和 PushConsumer,PullConsumer为全部消费(可重复消费);PushConsumer为即时消费(不可重复消费)。最初代码大致如下:

 1 from demo_data_strem import tasks
 2 from rocketmq.client import PushConsumer
 3 import json
 4 
 5 class DataPersistence():
 6 
 7     def __init__(self, ip, consumer, topic):
 8         self._topic = topic
 9         self._consumer = PushConsumer(consumer)
10         self._consumer.set_namesrv_addr(ip)
11 
12     def _generate(self, dataset):
13         json_data = json.load(dataset.body.strip())
14         # 简单做个打印操作
15         print(json_data)
16         tasks.event_info.put(json_data)
17 
18 
19     def main(self):
20         self._consumer.subscribe(self._topic, self._generate)
21         self._consumer.start()
22 
23 
24 if __name__ == '__main__':
25     persistence_obj = DataPersistence("192.168.1.2", "analysis", "event")
26     while True:
27         persistence_obj.main()

使用时发现程序内存会持续增加,小概率会报MemoryError错误。

定位错误代码是在   self._consumer.subscribe(self._topic, self._generate)   这里面。起初以为这是这个模块的BUG,为了记住消费进度会保存mq中数据的引用,从而达到不重复消费目的。

该模块作者(messense)还写了另一个模块rocketmq-client-python,在README中看到给subscribe的callback函数中返回了一个类似flag的东西,以为是替换了rocketmq-python的记住消费进度方案,很有可能可以解决内存溢出问题。

于是修改如下:

 1 from demo_data_strem import tasks
 2 from rocketmq.client import PushConsumer, ConsumeStatus
 3 import json
 4 
 5 class DataPersistence():
 6 
 7     def __init__(self, ip, consumer, topic):
 8         self._topic = topic
 9         self._consumer = PushConsumer(consumer)
10         self._consumer.set_name_server_address(ip)
11 
12     def _generate(self, dataset):
13         json_data = json.load(dataset.body.strip())
14         # 简单做个打印操作
15         print(json_data)
16         tasks.event_info.put(json_data)
17 
18         return ConsumeStatus.CONSUME_SUCCESS
19 
20 
21     def main(self):
22         self._consumer.subscribe(self._topic, self._generate)
23         self._consumer.start()
24 
25 
26 if __name__ == '__main__':
27     persistence_obj = DataPersistence("192.168.1.2", "analysis", "event")
28     while True:
29         persistence_obj.main()

使用发现没有MemoryError错误了,但是内存依然会持续增加,增加幅度相对之前低了些。当然还得继续优化。

比较难的是网上找不到该模块的任何详细介绍,也没有发现出现过类似情况的帖,在github上提问中也没有类似问题。

再仔细看README发现,作者的PushConsumer的demo中subscribe和start都没有写在循环内,写在循环内的只是一个sleep,如下:

 1 import time
 2 
 3 from rocketmq.client import PushConsumer, ConsumeStatus
 4 
 5 
 6 def callback(msg):
 7     print(msg.id, msg.body)
 8     return ConsumeStatus.CONSUME_SUCCESS
 9 
10 
11 consumer = PushConsumer('CID_XXX')
12 consumer.set_name_server_address('127.0.0.1:9876')
13 consumer.subscribe('YOUR-TOPIC', callback)
14 consumer.start()
15 
16 while True:
17     time.sleep(3600)
18 
19 consumer.shutdown()

循环的意义貌似只是让程序不终止,模拟作者demo的写法,去掉where True发现消费到几条数据后程序就终止了。得以下几条结论:

1.subscribe只是将topic和回调函数绑定,该步骤不会去消费,调用start方法后该topic消费到的数据交给回调函数处理

2.看源码回调函数会交给一个列表,猜测可以实现:一个consumer可以一个topic可以绑定多个回调函数;一个consumer可以多个topic绑定不同回调函数一起消费。还未做验证,待更新。

3.同一个topic 同一个回调函数只需绑定一次,subscribe函数会重复使用内存。

4.start也只调用一次即可,重复调用无意义。

5.调用consumer.start()后只需想办法让主进程(主线程)不终止即可持续消费。

正确使用PushConsumer消费的方法参照作者的demo即可,最终修改如下:

 1 from demo_data_strem import tasks
 2 from rocketmq.client import PushConsumer, ConsumeStatus
 3 import json
 4 import time
 5 
 6 class DataPersistence():
 7 
 8     def __init__(self, ip, consumer, topic):
 9         self._topic = topic
10         self._consumer = PushConsumer(consumer)
11         self._consumer.set_name_server_address(ip)
12         self._consumer.subscribe(self._topic, self._generate)
13 
14     def _generate(self, dataset):
15         json_data = json.load(dataset.body.strip())
16         # 简单做个打印操作
17         print(json_data)
18         tasks.event_info.put(json_data)
19 
20         return ConsumeStatus.CONSUME_SUCCESS
21 
22     def main(self):
23         self._consumer.start()
24 
25     def close(self):
26         self._consumer.shutdown()
27 
28 
29 if __name__ == '__main__':
30     persistence_obj = DataPersistence("192.168.1.2", "analysis", "event")
31     persistence_obj.main()
32     while True:
33         time.sleep(60)
34     persistence_obj.close()

 


  • 作者:合十
  • 发表时间:2021年11月18日 16:26
  • 更新时间:2022年9月30日 19:51
  • 所属分类:我用Python

Comments

该文章还未收到评论,点击下方评论框开始评论吧~