一、利用property属性封装当遇到需重新链接、重新打开等操作时的一系列复杂操作,做到优雅重载
用例背景:有一个项目是需要ssh远程连接多台设备,不停的和各设备交互达到设备检测校验的需求,比较难搞的是设备连接中断情况下的处理逻辑。因此利用property属性将与设备的连接封装起来,连接后对象的操作用捕获异常方式放在闭包中调用,当调用出错时根据异常情况做相应处理,以达到通知连接断开的目的,且完全不影响后续的交互,内部会做到重新连接。
下面,看下示例代码:
1 import paramiko 2 import socket 3 import time 4 5 6 class SSHPipeClient: 7 8 def __init__(self, ip, port, username, pwd): 9 self._ip = ip 10 self._port = port 11 self._username = username 12 self._pwd = pwd 13 self._transport = None # paramiko.Transport对象 14 self._channel = None # ssh通道对象 15 self._sftp = None # sftp对象 16 17 @property 18 def channel(self): 19 """ 20 ssh通道property属性,当paramiko.Transport对象不存在或_channel不存在时,内部实现重连 21 :return: ssh通道对象 22 """ 23 if self._transport is None: 24 self._transport = paramiko.Transport((self._ip, self._port)) 25 self._transport.connect(username=self._username, password=self._pwd) 26 27 self._channel = self._transport.open_session() 28 self._channel.get_pty() 29 self._channel.invoke_shell() 30 elif self._channel is None: 31 self._channel = self._transport.open_session() 32 self._channel.get_pty() 33 self._channel.invoke_shell() 34 35 return self._channel 36 37 @property 38 def sftp(self): 39 """ 40 sftp property属性,当paramiko.Transport对象不存在或_channel不存在时,内部实现重连 41 :return: sftp对象 42 """ 43 if self._transport is None: 44 self._transport = paramiko.Transport((self._ip, self._port)) 45 self._transport.connect(username=self._username, password=self._pwd) 46 self._sftp = self._transport.open_sftp_client() 47 elif self._sftp is None: 48 self._sftp = self._transport.open_sftp_client() 49 50 return self._sftp 51 52 def send_and_recv(self, cmd, timeout=2): 53 """ 54 利用ssh通道发送命令至远程主机,并获取返回结果 55 :param str cmd: 欲发生的命令 56 :param int timeout: 发送命令后到拿取结果的等待时间 57 :return: 主机返回结果 或 None 58 """ 59 def _inner(cmd): 60 self.channel.send(f'{cmd}\n') 61 time.sleep(timeout) 62 result = self.channel.recv(65535).decode() 63 return result 64 65 return self._handle_if_fail_connect(_inner, cmd) 66 67 def get(self, remote_path, local_path): 68 """ 69 利用sftp对象从远程主机下载文件 70 :param str remote_path: 远程主机文件路径 71 :param str local_path: 本地文件路径 72 :return: '' 或 None 73 """ 74 def _inner(remote_path, local_path): 75 self.sftp.get(remote_path, local_path) 76 77 return self._handle_if_fail_connect(_inner, remote_path, local_path) 78 79 def put(self, local_path, remote_path): 80 """ 81 利用sftp对象上传文件至远程主机 82 :param str local_path: 本地文件路径 83 :param str remote_path: 远程主机文件路径 84 :return: '' 或 None 85 """ 86 def _inner(local_path, remote_path): 87 self.sftp.put(local_path, remote_path) 88 89 return self._handle_if_fail_connect(_inner, local_path, remote_path) 90 91 def _handle_if_fail_connect(self, func, *args, **kwargs): 92 """ 93 捕获异常函数,将与远程主机交互的函数进行捕获,并根据异常结果做相应处理 94 :param callable func: 与远程主机交互的函数 95 :param args: 函数位置参数 96 :param kwargs: 函数关键字参数 97 :return: 原交互结果 或 None 98 """ 99 try: 100 result = func(*args, **kwargs) or '' 101 # 下面仅对连接异常做捕获,可对sftp操作的异常先做捕获,这样就能区分是上传/下载文件异常还是连接异常 102 except (socket.timeout, ConnectionResetError, paramiko.ssh_exception.SSHException): 103 self._close() 104 return None 105 else: 106 return result 107 108 def _close(self): 109 """ 110 当与远程主机交互出现异常时的处理 111 :return: None 112 """ 113 self._channel.close() 114 self._sftp.close() 115 self._transport.close() 116 117 self._transport = None
需要注意的是:python在调用函数时当传入的参数为property属性或函数时,会先执行property属性或函数的代码,后再将其返回值传入到函数执行,因此这里与远程主机的操作都放在_inner闭包函数中。
二、封装集体线程等待(红绿灯)
用例背景:在用多线程对多台设备操作时,有一个需求时当所有设备都到达某一状态时等待 去执行某个操作,操作执行完后各线程再执行后续操作,首先想到的就是threading.Event,但监测所有线程都达到wait后执行操作再放行这一需求,如果按常理每遇到一次就写个死循环去监测实在不够优雅,于是封装了一个模块,实现等待和放行的地方仅需一行代码完成需求。
下面,看示例代码:
1 import time 2 from threading import Event 3 4 5 __all__ = [ 6 "ThreadEvent" 7 ] 8 9 10 _event_map = {} 11 12 13 class ThreadEvent: 14 15 def __init__(self, event, name): 16 """ 17 自定义线程事件类,按业务需求对原线程事件二次封装 18 :param event: 原线程事件 19 :param name: 一组使用同一线程事件的标识名称 20 """ 21 event: Event 22 global _event_map 23 24 self._event = event 25 self._name = name 26 27 try: 28 _event_map[self._name]["total"] += 1 29 except KeyError: 30 _event_map[self._name] = { 31 "total": 1, 32 "wait": 0 33 } 34 35 def wait(self): 36 """ 37 线程等待 38 :return: None 39 """ 40 global _event_map 41 _event_map[self._name]["wait"] += 1 42 43 self._event.wait() 44 45 def release(self, func): 46 """ 47 线程放行,当wait的线程数和total数-1(因为有一个负责放行,不会调用wait)一致时,则执行操作后放行,放行后立即锁住 48 :param func: 49 :return: 50 """ 51 global _event_map 52 53 while _event_map[self._name]["wait"] != _event_map[self._name]["total"] - 1: 54 time.sleep(1) 55 56 func() 57 self._event.set() 58 59 _event_map[self._name]["wait"] = 0 60 self._event.clear()
使用方:
1 import time 2 from threading import Thread, Event 3 4 from demo_thread_event.module import ThreadEvent 5 6 7 device_name = 'chroma' 8 9 10 def do_work1(event): 11 event_ = ThreadEvent(event, device_name) 12 while True: 13 print("work【1】开始新一轮工作") 14 # sleep模拟该线程前序操作 15 time.sleep(5) 16 print("work【1】进入等待。。。") 17 event_.wait() 18 19 print("work【1】执行后续操作") 20 # sleep模拟该线程后续操作 21 time.sleep(3) 22 23 24 def do_work2(event): 25 event_ = ThreadEvent(event, device_name) 26 while True: 27 print("work【2】开始新一轮工作") 28 # sleep模拟该线程前序操作 29 time.sleep(10) 30 print("work【2】进入等待。。。") 31 event_.wait() 32 33 print("work【2】执行后续操作") 34 # sleep模拟该线程后续操作 35 time.sleep(2) 36 37 38 if __name__ == '__main__': 39 e = Event() 40 # 1.用原始线程事件创建一个放行事件 41 main_event = ThreadEvent(e, device_name) 42 43 # 2.开启两个线程工作,在需要等待的地方调用wait等待 44 t1 = Thread(target=do_work1, args=(e, )) 45 t2 = Thread(target=do_work2, args=(e, )) 46 47 t1.start() 48 t2.start() 49 50 while True: 51 # 3.一直循环调用放行,sleep 5秒模拟当所有线程都在等待时执行的操作,执行完成后放行 52 main_event.release(lambda :time.sleep(5))
运行结果符合预期:
1 work【1】开始新一轮工作 2 work【2】开始新一轮工作 3 work【1】进入等待。。。 4 work【2】进入等待。。。 5 work【1】执行后续操作work【2】执行后续操作 6 7 work【2】开始新一轮工作 8 work【1】开始新一轮工作 9 work【1】进入等待。。。 10 work【2】进入等待。。。 11 work【1】执行后续操作work【2】执行后续操作 12 13 work【2】开始新一轮工作 14 work【1】开始新一轮工作 15 work【1】进入等待。。。 16 work【2】进入等待。。。
Comments