云通讯Python SDK用python2编写,看了写的实在糟糕,大量代码冗余。另外由于编码问题,用在python3上改动比较大。干脆自己改写了一个python3版本
#-*- coding: UTF-8 -*- # Copyright (c) 2014 The CCP project authors. All Rights Reserved. # # Use of this source code is governed by a Beijing Speedtong Information Technology Co.,Ltd license # that can be found in the LICENSE file in the root of the web site. # # http://www.yuntongxun.com # # An additional intellectual property rights grant can be found # in the file PATENTS. All contributing project authors may # be found in the AUTHORS file in the root of the source tree. from hashlib import md5 import base64 import datetime import json import requests from xmltojson import xmltojson class REST: AccountSid='' AccountToken='' AppId='' SubAccountSid='' SubAccountToken='' ServerIP='' ServerPort='' SoftVersion='' Iflog=False #是否打印日志 Batch='' #时间戳 BodyType = 'xml'#包体格式,可填值:json 、xml # 初始化 # @param serverIP 必选参数 服务器地址 # @param serverPort 必选参数 服务器端口 # @param softVersion 必选参数 REST版本号 def __init__(self,ServerIP,ServerPort,SoftVersion): self.ServerIP = ServerIP self.ServerPort = ServerPort self.SoftVersion = SoftVersion # 设置主帐号 # @param AccountSid 必选参数 主帐号 # @param AccountToken 必选参数 主帐号Token def setAccount(self,AccountSid,AccountToken): self.AccountSid = AccountSid self.AccountToken = AccountToken # 设置子帐号 # # @param SubAccountSid 必选参数 子帐号 # @param SubAccountToken 必选参数 子帐号Token def setSubAccount(self,SubAccountSid,SubAccountToken): self.SubAccountSid = SubAccountSid self.SubAccountToken = SubAccountToken # 设置应用ID # # @param AppId 必选参数 应用ID def setAppId(self,AppId): self.AppId = AppId def log(self,url,body,data): print('这是请求的URL:') print (url) print('这是请求包体:') print (body) print('这是响应包体:') print (data) print('********************************') def eval_func(self, diff_url_path, body, multiple_url_param="", headers={}): """公共函数""" self.accAuth() nowdate = datetime.datetime.now() self.Batch = nowdate.strftime("%Y%m%d%H%M%S") # 生成sig signature = self.AccountSid + self.AccountToken + self.Batch sig = md5(signature.encode("utf-8")).hexdigest().upper() # 拼接url url = "https://%s:%s/%s/Accounts/%s/%s?sig=%s%s" % (self.ServerIP, self.ServerPort, self.SoftVersion, self.AccountSid, diff_url_path, sig, multiple_url_param) # 生成auth src = self.AccountSid + ":" + self.Batch # auth = base64.encodestring(src).strip() auth = base64.encodebytes(src.encode("utf-8")).decode("utf-8").strip() # headers if not headers: headers = {"Authorization": auth, "Content-Type": "text/xml"} else: headers = headers.update({"Authorization": auth}) try: res = requests.post(url, headers=headers, data=body) data = res.content if self.BodyType=='json': #json格式 locations = json.loads(data) else: #xml格式 xtj=xmltojson() locations=xtj.main(data) if self.Iflog: self.log(url,body,data) return locations except Exception as e: if self.Iflog: self.log(url,body,"") return {'172001':'网络错误'} # 创建子账号 # @param friendlyName 必选参数 子帐号名称 def CreateSubAccount(self, friendlyName): # xml格式 body = '''<?xml version="1.0" encoding="utf-8"?><SubAccount><appId>%s</appId>\ <friendlyName>%s</friendlyName>\ </SubAccount>\ ''' % (self.AppId, friendlyName) if self.BodyType == 'json': # json格式 body = '''{"friendlyName": "%s", "appId": "%s"}''' % (friendlyName, self.AppId) return self.eval_func("SubAccounts", body) # 获取子帐号 # @param startNo 可选参数 开始的序号,默认从0开始 # @param offset 可选参数 一次查询的最大条数,最小是1条,最大是100条 def getSubAccounts(self, startNo,offset): # xml格式 body = '''<?xml version="1.0" encoding="utf-8"?><SubAccount><appId>%s</appId>\ <startNo>%s</startNo><offset>%s</offset>\ </SubAccount>\ ''' % (self.AppId, startNo, offset) if self.BodyType == 'json': # json格式 body = '''{"appId": "%s", "startNo": "%s", "offset": "%s"}''' % (self.AppId, startNo, offset) return self.eval_func("GetSubAccounts", body) # 子帐号信息查询 # @param friendlyName 必选参数 子帐号名称 def querySubAccount(self, friendlyName): # 创建包体 body = '''<?xml version="1.0" encoding="utf-8"?><SubAccount><appId>%s</appId>\ <friendlyName>%s</friendlyName>\ </SubAccount>\ ''' % (self.AppId, friendlyName) if self.BodyType == 'json': body = '''{"friendlyName": "%s", "appId": "%s"}''' % (friendlyName, self.AppId) return self.eval_func("QuerySubAccountByName", body) # 发送模板短信 # @param to 必选参数 短信接收彿手机号码集合,用英文逗号分开 # @param datas 可选参数 内容数据 # @param tempId 必选参数 模板Id def sendTemplateSMS(self, to,datas,tempId): # 创建包体 b = '' for a in datas: b += '<data>%s</data>' % (a) body = '<?xml version="1.0" encoding="utf-8"?><SubAccount><datas>' + b + '</datas><to>%s</to><templateId>%s</templateId><appId>%s</appId>\ </SubAccount>\ ' % (to, tempId, self.AppId) if self.BodyType == 'json': # if this model is Json ..then do next code b = '[' for a in datas: b += '"%s",' % (a) b += ']' body = '''{"to": "%s", "datas": %s, "templateId": "%s", "appId": "%s"}''' % (to, b, tempId, self.AppId) return self.eval_func("SMS/TemplateSMS", body) # 外呼通知 # @param to 必选参数 被叫号码 # @param mediaName 可选参数 语音文件名称,格式 wav。与mediaTxt不能同时为空。当不为空时mediaTxt属性失效。 # @param mediaTxt 可选参数 文本内容 # @param displayNum 可选参数 显示的主叫号码 # @param playTimes 可选参数 循环播放次数,1-3次,默认播放1次。 # @param respUrl 可选参数 外呼通知状态通知回调地址,云通讯平台将向该Url地址发送呼叫结果通知。 # @param userData 可选参数 用户私有数据 # @param maxCallTime 可选参数 最大通话时长 # @param speed 可选参数 发音速度 # @param volume 可选参数 音量 # @param pitch 可选参数 音调 # @param bgsound 可选参数 背景音编号 def landingCall(self,to,mediaName,mediaTxt,displayNum,playTimes,respUrl,userData,maxCallTime,speed,volume,pitch,bgsound): # 创建包体 body = '''<?xml version="1.0" encoding="utf-8"?><LandingCall>\ <to>%s</to><mediaName>%s</mediaName><mediaTxt>%s</mediaTxt><appId>%s</appId><displayNum>%s</displayNum>\ <playTimes>%s</playTimes><respUrl>%s</respUrl><userData>%s</userData><maxCallTime>%s</maxCallTime><speed>%s</speed> <volume>%s</volume><pitch>%s</pitch><bgsound>%s</bgsound></LandingCall>\ ''' % (to, mediaName, mediaTxt, self.AppId, displayNum, playTimes, respUrl, userData, maxCallTime, speed, volume, pitch, bgsound) if self.BodyType == 'json': body = '''{"to": "%s", "mediaName": "%s","mediaTxt": "%s","appId": "%s","displayNum": "%s","playTimes": "%s","respUrl": "%s","userData": "%s","maxCallTime": "%s","speed": "%s","volume": "%s","pitch": "%s","bgsound": "%s"}''' % ( to, mediaName, mediaTxt, self.AppId, displayNum, playTimes, respUrl, userData, maxCallTime, speed, volume, pitch, bgsound) return self.eval_func("Calls/LandingCalls", body) # 语音验证码 # @param verifyCode 必选参数 验证码内容,为数字和英文字母,不区分大小写,长度4-8位 # @param playTimes 可选参数 播放次数,1-3次 # @param to 必选参数 接收号码 # @param displayNum 可选参数 显示的主叫号码 # @param respUrl 可选参数 语音验证码状态通知回调地址,云通讯平台将向该Url地址发送呼叫结果通知 # @param lang 可选参数 语言类型 # @param userData 可选参数 第三方私有数据 def voiceVerify(self,verifyCode,playTimes,to,displayNum,respUrl,lang,userData): # 创建包体 body = '''<?xml version="1.0" encoding="utf-8"?><VoiceVerify>\ <appId>%s</appId><verifyCode>%s</verifyCode><playTimes>%s</playTimes><to>%s</to><respUrl>%s</respUrl>\ <displayNum>%s</displayNum><lang>%s</lang><userData>%s</userData></VoiceVerify>\ ''' % (self.AppId, verifyCode, playTimes, to, respUrl, displayNum, lang, userData) if self.BodyType == 'json': # if this model is Json ..then do next code body = '''{"appId": "%s", "verifyCode": "%s","playTimes": "%s","to": "%s","respUrl": "%s","displayNum": "%s","lang": "%s","userData": "%s"}''' % ( self.AppId, verifyCode, playTimes, to, respUrl, displayNum, lang, userData) return self.eval_func("Calls/VoiceVerify", body) # IVR外呼 # @param number 必选参数 待呼叫号码,为Dial节点的属性 # @param userdata 可选参数 用户数据,在<startservice>通知中返回,只允许填写数字字符,为Dial节点的属性 # @param record 可选参数 是否录音,可填项为true和false,默认值为false不录音,为Dial节点的属性 def ivrDial(self,number,userdata,record): # 创建包体 body = '''<?xml version="1.0" encoding="utf-8"?> <Request> <Appid>%s</Appid> <Dial number="%s" userdata="%s" record="%s"></Dial> </Request> ''' % (self.AppId, number, userdata, record) return self.eval_func("ivr/dial", body) # 话单下载 # @param date 必选参数 day 代表前一天的数据(从00:00 – 23:59),目前只支持按天查询 # @param keywords 可选参数 客户的查询条件,由客户自行定义并提供给云通讯平台。默认不填忽略此参数 def billRecords(self,date,keywords): # 创建包体 body = '''<?xml version="1.0" encoding="utf-8"?><BillRecords>\ <appId>%s</appId><date>%s</date><keywords>%s</keywords>\ </BillRecords>\ ''' % (self.AppId, date, keywords) return self.eval_func("BillRecords", body) # 主帐号信息查询 def queryAccountInfo(self): body = '' return self.eval_func("AccountInfo", body) # 短信模板查询 # @param templateId 必选参数 模板Id,不带此参数查询全部可用模板 def QuerySMSTemplate(self,templateId): # 创建包体 body = '''<?xml version="1.0" encoding="utf-8"?><Request>\ <appId>%s</appId><templateId>%s</templateId></Request> ''' % (self.AppId, templateId) return self.eval_func("SMS/QuerySMSTemplate", body) # 呼叫结果查询 # @param callsid 必选参数 呼叫ID def CallResult(self,callSid): body = '' return self.eval_func("CallResult", body, multiple_url_param="&callsid=%s" % callSid) # 呼叫状态查询 # @param callid 必选参数 一个由32个字符组成的电话唯一标识符 # @param action 可选参数 查询结果通知的回调url地址 def QueryCallState (self,callid,action): # 创建包体 body = '''<?xml version="1.0" encoding="utf-8"?><Request>\ <Appid>%s</Appid><QueryCallState callid="%s" action="%s"/>\ </Request>\ ''' % (self.AppId, callid, action) return self.eval_func("ivr/call", body, multiple_url_param="&callid=%s" % callid) # 语音文件上传 # @param filename 必选参数 文件名 # @param body 必选参数 二进制串 def MediaFileUpload (self,filename,body): if self.BodyType == 'json': headers = {"Accept":"application/json", "Content-Type":"application/octet-stream"} else: headers = {"Accept":"application/xml", "Content-Type":"application/octet-stream"} return self.eval_func("Calls/MediaFileUpload", body, multiple_url_param="&filename=%s" % filename, headers=headers) #子帐号鉴权 def subAuth(self): if(self.ServerIP==""): print('172004') print('IP为空') if(self.ServerPort<=0): print('172005') print('端口错误(小于等于0)') if(self.SoftVersion==""): print('172013') print('版本号为空') if(self.SubAccountSid==""): print('172008') print('子帐号为空') if(self.SubAccountToken==""): print('172009') print('子帐号令牌为空') if(self.AppId==""): print('172012') print('应用ID为空') #主帐号鉴权 def accAuth(self): if(self.ServerIP==""): print('172004') print('IP为空') if(self.ServerPort<="0"): print('172005') print('端口错误(小于等于0)') if(self.SoftVersion==""): print('172013') print('版本号为空') if(self.AccountSid==""): print('172006') print('主帐号为空') if(self.AccountToken==""): print('172007') print('主帐号令牌为空') if(self.AppId==""): print('172012') print('应用ID为空') #设置包头 def setHttpHeader(self,req): if self.BodyType == 'json': req.add_header("Accept", "application/json") req.add_header("Content-Type", "application/json;charset=utf-8") else: req.add_header("Accept", "application/xml") req.add_header("Content-Type", "application/xml;charset=utf-8")
附原 xmltojson.py
# -*- coding: utf-8 -*- #python xml.etree.ElementTree import xml.etree.ElementTree as ET class xmltojson: #global var #show log SHOW_LOG = True #XML file XML_PATH = None a={} m=[] def get_root(self,path): '''parse the XML file,and get the tree of the XML file finally,return the root element of the tree. if the XML file dose not exist,then print the information''' #if os.path.exists(path): #if SHOW_LOG: #print('start to parse the file : [{}]'.format(path)) tree = ET.fromstring(path) return tree #else: #print('the path [{}] dose not exist!'.format(path)) def get_element_tag(self,element): '''return the element tag if the element is not None.''' if element is not None: return element.tag else: print('the element is None!') def get_element_attrib(self,element): '''return the element attrib if the element is not None.''' if element is not None: return element.attrib else: print('the element is None!') def get_element_text(self,element): '''return the text of the element.''' if element is not None: return element.text else: print('the element is None!') def get_element_children(self,element): '''return the element children if the element is not None.''' if element is not None: return [c for c in element] else: print('the element is None!') def get_elements_tag(self,elements): '''return the list of tags of element's tag''' if elements is not None: tags = [] for e in elements: tags.append(e.tag) return tags else: print('the elements is None!') def get_elements_attrib(self,elements): '''return the list of attribs of element's attrib''' if elements is not None: attribs = [] for a in elements: attribs.append(a.attrib) return attribs else: print('the elements is None!') def get_elements_text(self,elements): '''return the dict of element''' if elements is not None: text = [] for t in elements: text.append(t.text) return dict(zip(self.get_elements_tag(elements), text)) else: print('the elements is None!') def main(self,xml): #root root = self.get_root(xml) #children children = self.get_element_children(root) children_tags = self.get_elements_tag(children) children_attribs = self.get_elements_attrib(children) i=0 #获取二级元素的每一个子节点的名称和值 for c in children: p=0 c_children = self.get_element_children(c) dict_text = self.get_elements_text(c_children) if dict_text : #print (children_tags[i]) if children_tags[i] =='TemplateSMS': self.a['templateSMS']=dict_text else : if children_tags[i]=='SubAccount': k=0 for x in children: if children_tags[k]=='totalCount': self.m.append(dict_text) self.a['SubAccount']=self.m p=1 k=k+1 if p==0: self.a[children_tags[i]]=dict_text else: self.a[children_tags[i]]=dict_text else: self.a[children_tags[i]]=c.text i=i+1 return self.a def main2(self,xml): #root root = self.get_root(xml) #children children = self.get_element_children(root) children_tags = self.get_elements_tag(children) children_attribs = self.get_elements_attrib(children) i=0 #获取二级元素的每一个子节点的名称和值 for c in children: p=0 c_children = self.get_element_children(c) dict_text = self.get_elements_text(c_children) if dict_text : if children_tags[i] =='TemplateSMS': k=0 for x in children: if children_tags[k]=='totalCount': self.m.append(dict_text) self.a['TemplateSMS']=self.m p=1 k=k+1 if p==0: self.a[children_tags[i]]=dict_text else: self.a[children_tags[i]]=dict_text else: self.a[children_tags[i]]=c.text i=i+1 return self.a
Comments