Python场景实现记录

一、利用property属性封装当遇到需重新链接、重新打开等操作时的一系列复杂操作,做到优雅重载

用例背景:有一个项目是需要ssh远程连接多台设备,不停的和各设备交互达到设备检测校验的需求,比较难搞的是设备连接中断情况下的处理逻辑。因此利用property属性将与设备的连接封装起来,连接后对象的操作用捕获异常方式放在闭包中调用,当调用出错时根据异常情况做相应处理,以达到通知连接断开的目的,且完全不影响后续的交互,内部会做到重新连接。

  下面,看下示例代码:

  1 import paramiko
  2 import socket
  3 import time
  4 
  5 
  6 class SSHPipeClient:
  7 
  8     def __init__(self, ip, port, username, pwd):
  9         self._ip = ip
 10         self._port = port
 11         self._username = username
 12         self._pwd = pwd
 13         self._transport = None                  # paramiko.Transport对象
 14         self._channel = None                    # ssh通道对象
 15         self._sftp = None                       # sftp对象
 16 
 17     @property
 18     def channel(self):
 19         """
 20         ssh通道property属性,当paramiko.Transport对象不存在或_channel不存在时,内部实现重连
 21         :return: ssh通道对象
 22         """
 23         if self._transport is None:
 24             self._transport = paramiko.Transport((self._ip, self._port))
 25             self._transport.connect(username=self._username, password=self._pwd)
 26 
 27             self._channel = self._transport.open_session()
 28             self._channel.get_pty()
 29             self._channel.invoke_shell()
 30         elif self._channel is None:
 31             self._channel = self._transport.open_session()
 32             self._channel.get_pty()
 33             self._channel.invoke_shell()
 34 
 35         return self._channel
 36 
 37     @property
 38     def sftp(self):
 39         """
 40         sftp property属性,当paramiko.Transport对象不存在或_channel不存在时,内部实现重连
 41         :return: sftp对象
 42         """
 43         if self._transport is None:
 44             self._transport = paramiko.Transport((self._ip, self._port))
 45             self._transport.connect(username=self._username, password=self._pwd)
 46             self._sftp = self._transport.open_sftp_client()
 47         elif self._sftp is None:
 48             self._sftp = self._transport.open_sftp_client()
 49 
 50         return self._sftp
 51 
 52     def send_and_recv(self, cmd, timeout=2):
 53         """
 54         利用ssh通道发送命令至远程主机,并获取返回结果
 55         :param str cmd: 欲发生的命令
 56         :param int timeout: 发送命令后到拿取结果的等待时间
 57         :return: 主机返回结果 或 None
 58         """
 59         def _inner(cmd):
 60             self.channel.send(f'{cmd}\n')
 61             time.sleep(timeout)
 62             result = self.channel.recv(65535).decode()
 63             return result
 64 
 65         return self._handle_if_fail_connect(_inner, cmd)
 66 
 67     def get(self, remote_path, local_path):
 68         """
 69         利用sftp对象从远程主机下载文件
 70         :param str remote_path: 远程主机文件路径
 71         :param str local_path: 本地文件路径
 72         :return: '' 或 None
 73         """
 74         def _inner(remote_path, local_path):
 75             self.sftp.get(remote_path, local_path)
 76 
 77         return self._handle_if_fail_connect(_inner, remote_path, local_path)
 78 
 79     def put(self, local_path, remote_path):
 80         """
 81         利用sftp对象上传文件至远程主机
 82         :param str local_path: 本地文件路径
 83         :param str remote_path: 远程主机文件路径
 84         :return: '' 或 None
 85         """
 86         def _inner(local_path, remote_path):
 87             self.sftp.put(local_path, remote_path)
 88 
 89         return self._handle_if_fail_connect(_inner, local_path, remote_path)
 90 
 91     def _handle_if_fail_connect(self, func, *args, **kwargs):
 92         """
 93         捕获异常函数,将与远程主机交互的函数进行捕获,并根据异常结果做相应处理
 94         :param callable func: 与远程主机交互的函数
 95         :param args: 函数位置参数
 96         :param kwargs: 函数关键字参数
 97         :return: 原交互结果 或 None
 98         """
 99         try:
100             result = func(*args, **kwargs) or ''
101         # 下面仅对连接异常做捕获,可对sftp操作的异常先做捕获,这样就能区分是上传/下载文件异常还是连接异常
102         except (socket.timeout, ConnectionResetError, paramiko.ssh_exception.SSHException):
103             self._close()
104             return None
105         else:
106             return result
107 
108     def _close(self):
109         """
110         当与远程主机交互出现异常时的处理
111         :return: None
112         """
113         self._channel.close()
114         self._sftp.close()
115         self._transport.close()
116 
117         self._transport = None

  需要注意的是:python在调用函数时当传入的参数为property属性或函数时,会先执行property属性或函数的代码,后再将其返回值传入到函数执行,因此这里与远程主机的操作都放在_inner闭包函数中。

二、封装集体线程等待(红绿灯)

用例背景:在用多线程对多台设备操作时,有一个需求时当所有设备都到达某一状态时等待 去执行某个操作,操作执行完后各线程再执行后续操作,首先想到的就是threading.Event,但监测所有线程都达到wait后执行操作再放行这一需求,如果按常理每遇到一次就写个死循环去监测实在不够优雅,于是封装了一个模块,实现等待和放行的地方仅需一行代码完成需求。

  下面,看示例代码:

 1 import time
 2 from threading import Event
 3 
 4 
 5 __all__ = [
 6     "ThreadEvent"
 7 ]
 8 
 9 
10 _event_map = {}
11 
12 
13 class ThreadEvent:
14 
15     def __init__(self, event, name):
16         """
17         自定义线程事件类,按业务需求对原线程事件二次封装
18         :param event: 原线程事件
19         :param name: 一组使用同一线程事件的标识名称
20         """
21         event: Event
22         global _event_map
23 
24         self._event = event
25         self._name = name
26 
27         try:
28             _event_map[self._name]["total"] += 1
29         except KeyError:
30             _event_map[self._name] = {
31                 "total": 1,
32                 "wait": 0
33             }
34 
35     def wait(self):
36         """
37         线程等待
38         :return: None
39         """
40         global _event_map
41         _event_map[self._name]["wait"] += 1
42 
43         self._event.wait()
44 
45     def release(self, func):
46         """
47         线程放行,当wait的线程数和total数-1(因为有一个负责放行,不会调用wait)一致时,则执行操作后放行,放行后立即锁住
48         :param func:
49         :return:
50         """
51         global _event_map
52 
53         while _event_map[self._name]["wait"] != _event_map[self._name]["total"] - 1:
54                 time.sleep(1)
55 
56         func()
57         self._event.set()
58 
59         _event_map[self._name]["wait"] = 0
60         self._event.clear()

  使用方:

 1 import time
 2 from threading import Thread, Event
 3 
 4 from demo_thread_event.module import ThreadEvent
 5 
 6 
 7 device_name = 'chroma'
 8 
 9 
10 def do_work1(event):
11     event_ = ThreadEvent(event, device_name)
12     while True:
13         print("work【1】开始新一轮工作")
14         # sleep模拟该线程前序操作
15         time.sleep(5)
16         print("work【1】进入等待。。。")
17         event_.wait()
18 
19         print("work【1】执行后续操作")
20         # sleep模拟该线程后续操作
21         time.sleep(3)
22 
23 
24 def do_work2(event):
25     event_ = ThreadEvent(event, device_name)
26     while True:
27         print("work【2】开始新一轮工作")
28         # sleep模拟该线程前序操作
29         time.sleep(10)
30         print("work【2】进入等待。。。")
31         event_.wait()
32 
33         print("work【2】执行后续操作")
34         # sleep模拟该线程后续操作
35         time.sleep(2)
36 
37 
38 if __name__ == '__main__':
39     e = Event()
40     # 1.用原始线程事件创建一个放行事件
41     main_event = ThreadEvent(e, device_name)
42 
43     # 2.开启两个线程工作,在需要等待的地方调用wait等待
44     t1 = Thread(target=do_work1, args=(e, ))
45     t2 = Thread(target=do_work2, args=(e, ))
46 
47     t1.start()
48     t2.start()
49 
50     while True:
51         # 3.一直循环调用放行,sleep 5秒模拟当所有线程都在等待时执行的操作,执行完成后放行
52         main_event.release(lambda :time.sleep(5))

  运行结果符合预期:

 1 work【1】开始新一轮工作
 2 work【2】开始新一轮工作
 3 work【1】进入等待。。。
 4 work【2】进入等待。。。
 5 work【1】执行后续操作work【2】执行后续操作
 6 
 7 work【2】开始新一轮工作
 8 work【1】开始新一轮工作
 9 work【1】进入等待。。。
10 work【2】进入等待。。。
11 work【1】执行后续操作work【2】执行后续操作
12 
13 work【2】开始新一轮工作
14 work【1】开始新一轮工作
15 work【1】进入等待。。。
16 work【2】进入等待。。。

 


  • 作者:合十
  • 发表时间:2023年2月22日 22:35
  • 更新时间:2024年6月16日 09:48
  • 所属分类:我用Python

Comments

该文章还未收到评论,点击下方评论框开始评论吧~