Python3版云通讯SDK

云通讯Python SDK用python2编写,看了写的实在糟糕,大量代码冗余。另外由于编码问题,用在python3上改动比较大。干脆自己改写了一个python3版本

#-*- coding: UTF-8 -*-
#  Copyright (c) 2014 The CCP project authors. All Rights Reserved.
#
#  Use of this source code is governed by a Beijing Speedtong Information Technology Co.,Ltd license
#  that can be found in the LICENSE file in the root of the web site.
#
#   http://www.yuntongxun.com
#
#  An additional intellectual property rights grant can be found
#  in the file PATENTS.  All contributing project authors may
#  be found in the AUTHORS file in the root of the source tree.

from hashlib import md5
import base64
import datetime
import json
import requests
from xmltojson import xmltojson

class REST:
    
    AccountSid=''
    AccountToken=''
    AppId=''
    SubAccountSid=''
    SubAccountToken=''
    ServerIP=''
    ServerPort=''
    SoftVersion=''
    Iflog=False #是否打印日志
    Batch=''  #时间戳
    BodyType = 'xml'#包体格式,可填值:json 、xml
    
    # 初始化
    # @param serverIP       必选参数    服务器地址
    # @param serverPort     必选参数    服务器端口
    # @param softVersion    必选参数    REST版本号
    def __init__(self,ServerIP,ServerPort,SoftVersion):

        self.ServerIP = ServerIP
        self.ServerPort = ServerPort
        self.SoftVersion = SoftVersion
    
    
    # 设置主帐号
    # @param AccountSid  必选参数    主帐号
    # @param AccountToken  必选参数    主帐号Token
    def setAccount(self,AccountSid,AccountToken):
        self.AccountSid = AccountSid
        self.AccountToken = AccountToken
    

    # 设置子帐号
    # 
    # @param SubAccountSid  必选参数    子帐号
    # @param SubAccountToken  必选参数    子帐号Token
    def setSubAccount(self,SubAccountSid,SubAccountToken):
        self.SubAccountSid = SubAccountSid
        self.SubAccountToken = SubAccountToken

    # 设置应用ID
    # 
    # @param AppId  必选参数    应用ID
    def setAppId(self,AppId):
       self.AppId = AppId
    
    def log(self,url,body,data):
        print('这是请求的URL:')
        print (url)
        print('这是请求包体:')
        print (body)
        print('这是响应包体:')
        print (data)
        print('********************************')

    def eval_func(self, diff_url_path, body, multiple_url_param="", headers={}):
        """公共函数"""
        self.accAuth()
        nowdate = datetime.datetime.now()
        self.Batch = nowdate.strftime("%Y%m%d%H%M%S")
        # 生成sig
        signature = self.AccountSid + self.AccountToken + self.Batch
        sig = md5(signature.encode("utf-8")).hexdigest().upper()
        # 拼接url
        url = "https://%s:%s/%s/Accounts/%s/%s?sig=%s%s" % (self.ServerIP, self.ServerPort, self.SoftVersion, self.AccountSid, diff_url_path, sig, multiple_url_param)
        # 生成auth
        src = self.AccountSid + ":" + self.Batch
        # auth = base64.encodestring(src).strip()
        auth = base64.encodebytes(src.encode("utf-8")).decode("utf-8").strip()
        # headers
        if not headers:
            headers = {"Authorization": auth, "Content-Type": "text/xml"}
        else:
            headers = headers.update({"Authorization": auth})
        try:
            res = requests.post(url, headers=headers, data=body)
            data = res.content
            if self.BodyType=='json':
                #json格式
                locations = json.loads(data)
            else:
                #xml格式
                xtj=xmltojson()
                locations=xtj.main(data)
            if self.Iflog:
                self.log(url,body,data)
            return locations
        except Exception as e:
            if self.Iflog:
                self.log(url,body,"")
            return {'172001':'网络错误'}

    # 创建子账号
    # @param friendlyName   必选参数      子帐号名称
    def CreateSubAccount(self, friendlyName):
        # xml格式
        body = '''<?xml version="1.0" encoding="utf-8"?><SubAccount><appId>%s</appId>\
                <friendlyName>%s</friendlyName>\
                </SubAccount>\
                ''' % (self.AppId, friendlyName)
        if self.BodyType == 'json':
            # json格式
            body = '''{"friendlyName": "%s", "appId": "%s"}''' % (friendlyName, self.AppId)

        return self.eval_func("SubAccounts", body)

    #  获取子帐号
    # @param startNo  可选参数    开始的序号,默认从0开始
    # @param offset 可选参数     一次查询的最大条数,最小是1条,最大是100条
    def getSubAccounts(self, startNo,offset):
        # xml格式
        body = '''<?xml version="1.0" encoding="utf-8"?><SubAccount><appId>%s</appId>\
                <startNo>%s</startNo><offset>%s</offset>\
                </SubAccount>\
                ''' % (self.AppId, startNo, offset)
        if self.BodyType == 'json':
            # json格式
            body = '''{"appId": "%s", "startNo": "%s", "offset": "%s"}''' % (self.AppId, startNo, offset)
        return self.eval_func("GetSubAccounts", body)

    # 子帐号信息查询
    # @param friendlyName 必选参数   子帐号名称
    def querySubAccount(self, friendlyName):
        # 创建包体
        body = '''<?xml version="1.0" encoding="utf-8"?><SubAccount><appId>%s</appId>\
                <friendlyName>%s</friendlyName>\
                </SubAccount>\
                ''' % (self.AppId, friendlyName)
        if self.BodyType == 'json':
            body = '''{"friendlyName": "%s", "appId": "%s"}''' % (friendlyName, self.AppId)
        return self.eval_func("QuerySubAccountByName", body)

    # 发送模板短信
    # @param to  必选参数     短信接收彿手机号码集合,用英文逗号分开
    # @param datas 可选参数    内容数据
    # @param tempId 必选参数    模板Id
    def sendTemplateSMS(self, to,datas,tempId):
        # 创建包体
        b = ''
        for a in datas:
            b += '<data>%s</data>' % (a)

        body = '<?xml version="1.0" encoding="utf-8"?><SubAccount><datas>' + b + '</datas><to>%s</to><templateId>%s</templateId><appId>%s</appId>\
                    </SubAccount>\
                    ' % (to, tempId, self.AppId)
        if self.BodyType == 'json':
            # if this model is Json ..then do next code
            b = '['
            for a in datas:
                b += '"%s",' % (a)
            b += ']'
            body = '''{"to": "%s", "datas": %s, "templateId": "%s", "appId": "%s"}''' % (to, b, tempId, self.AppId)
        return self.eval_func("SMS/TemplateSMS", body)

    # 外呼通知
    # @param to 必选参数    被叫号码
    # @param mediaName 可选参数    语音文件名称,格式 wav。与mediaTxt不能同时为空。当不为空时mediaTxt属性失效。
    # @param mediaTxt 可选参数    文本内容
    # @param displayNum 可选参数    显示的主叫号码
    # @param playTimes 可选参数    循环播放次数,1-3次,默认播放1次。
    # @param respUrl 可选参数    外呼通知状态通知回调地址,云通讯平台将向该Url地址发送呼叫结果通知。
    # @param userData 可选参数    用户私有数据
    # @param maxCallTime 可选参数    最大通话时长
    # @param speed 可选参数    发音速度
    # @param volume 可选参数    音量
    # @param pitch 可选参数    音调
    # @param bgsound 可选参数    背景音编号
    def landingCall(self,to,mediaName,mediaTxt,displayNum,playTimes,respUrl,userData,maxCallTime,speed,volume,pitch,bgsound):
        # 创建包体
        body = '''<?xml version="1.0" encoding="utf-8"?><LandingCall>\
                <to>%s</to><mediaName>%s</mediaName><mediaTxt>%s</mediaTxt><appId>%s</appId><displayNum>%s</displayNum>\
                <playTimes>%s</playTimes><respUrl>%s</respUrl><userData>%s</userData><maxCallTime>%s</maxCallTime><speed>%s</speed>
                <volume>%s</volume><pitch>%s</pitch><bgsound>%s</bgsound></LandingCall>\
                ''' % (to, mediaName, mediaTxt, self.AppId, displayNum, playTimes, respUrl, userData, maxCallTime, speed, volume,
                        pitch, bgsound)
        if self.BodyType == 'json':
            body = '''{"to": "%s", "mediaName": "%s","mediaTxt": "%s","appId": "%s","displayNum": "%s","playTimes": "%s","respUrl": "%s","userData": "%s","maxCallTime": "%s","speed": "%s","volume": "%s","pitch": "%s","bgsound": "%s"}''' % (
            to, mediaName, mediaTxt, self.AppId, displayNum, playTimes, respUrl, userData, maxCallTime, speed, volume,
            pitch, bgsound)
        return self.eval_func("Calls/LandingCalls", body)
    
    # 语音验证码
    # @param verifyCode  必选参数   验证码内容,为数字和英文字母,不区分大小写,长度4-8位
    # @param playTimes  可选参数   播放次数,1-3次
    # @param to 必选参数    接收号码
    # @param displayNum 可选参数    显示的主叫号码
    # @param respUrl 可选参数    语音验证码状态通知回调地址,云通讯平台将向该Url地址发送呼叫结果通知
    # @param lang 可选参数    语言类型
    # @param userData 可选参数    第三方私有数据
    def voiceVerify(self,verifyCode,playTimes,to,displayNum,respUrl,lang,userData):
        # 创建包体
        body = '''<?xml version="1.0" encoding="utf-8"?><VoiceVerify>\
                <appId>%s</appId><verifyCode>%s</verifyCode><playTimes>%s</playTimes><to>%s</to><respUrl>%s</respUrl>\
                <displayNum>%s</displayNum><lang>%s</lang><userData>%s</userData></VoiceVerify>\
                ''' % (self.AppId, verifyCode, playTimes, to, respUrl, displayNum, lang, userData)
        if self.BodyType == 'json':
            # if this model is Json ..then do next code
            body = '''{"appId": "%s", "verifyCode": "%s","playTimes": "%s","to": "%s","respUrl": "%s","displayNum": "%s","lang": "%s","userData": "%s"}''' % (
                        self.AppId, verifyCode, playTimes, to, respUrl, displayNum, lang, userData)
        return self.eval_func("Calls/VoiceVerify", body)

    # IVR外呼
    # @param number  必选参数     待呼叫号码,为Dial节点的属性
    # @param userdata 可选参数    用户数据,在<startservice>通知中返回,只允许填写数字字符,为Dial节点的属性
    # @param record   可选参数    是否录音,可填项为true和false,默认值为false不录音,为Dial节点的属性
    def ivrDial(self,number,userdata,record):
        # 创建包体
        body = '''<?xml version="1.0" encoding="utf-8"?>
                    <Request>
                        <Appid>%s</Appid>
                        <Dial number="%s"  userdata="%s" record="%s"></Dial>
                    </Request>
                ''' % (self.AppId, number, userdata, record)
        return self.eval_func("ivr/dial", body)
    
    # 话单下载
    # @param date   必选参数    day 代表前一天的数据(从00:00 – 23:59),目前只支持按天查询
    # @param keywords  可选参数     客户的查询条件,由客户自行定义并提供给云通讯平台。默认不填忽略此参数
    def billRecords(self,date,keywords):
        # 创建包体
        body = '''<?xml version="1.0" encoding="utf-8"?><BillRecords>\
                <appId>%s</appId><date>%s</date><keywords>%s</keywords>\
                </BillRecords>\
                ''' % (self.AppId, date, keywords)
        return self.eval_func("BillRecords", body)

    # 主帐号信息查询
    def queryAccountInfo(self):
        body = ''
        return self.eval_func("AccountInfo", body)
        
    # 短信模板查询
    # @param templateId  必选参数   模板Id,不带此参数查询全部可用模板
    def QuerySMSTemplate(self,templateId):
        # 创建包体
        body = '''<?xml version="1.0" encoding="utf-8"?><Request>\
                <appId>%s</appId><templateId>%s</templateId></Request>
                ''' % (self.AppId, templateId)
        return self.eval_func("SMS/QuerySMSTemplate", body)
    
    # 呼叫结果查询
    # @param callsid   必选参数    呼叫ID
    def CallResult(self,callSid):
        body = ''
        return self.eval_func("CallResult", body, multiple_url_param="&callsid=%s" % callSid)

    # 呼叫状态查询
    # @param callid   必选参数    一个由32个字符组成的电话唯一标识符
    # @param action      可选参数     查询结果通知的回调url地址 
    def QueryCallState (self,callid,action):
        # 创建包体
        body = '''<?xml version="1.0" encoding="utf-8"?><Request>\
                <Appid>%s</Appid><QueryCallState callid="%s" action="%s"/>\
                </Request>\
                ''' % (self.AppId, callid, action)
        return self.eval_func("ivr/call", body, multiple_url_param="&callid=%s" % callid)

    # 语音文件上传
    # @param filename   必选参数    文件名
    # @param body      必选参数     二进制串
    def MediaFileUpload (self,filename,body):
        if self.BodyType == 'json':
            headers = {"Accept":"application/json", "Content-Type":"application/octet-stream"}
        else:
            headers = {"Accept":"application/xml", "Content-Type":"application/octet-stream"}

        return self.eval_func("Calls/MediaFileUpload", body, multiple_url_param="&filename=%s" % filename, headers=headers)

    #子帐号鉴权
    def subAuth(self):
        if(self.ServerIP==""):
            print('172004')
            print('IP为空')
        
        if(self.ServerPort<=0):
            print('172005')
            print('端口错误(小于等于0)')
        
        if(self.SoftVersion==""):
            print('172013')
            print('版本号为空')
        
        if(self.SubAccountSid==""):
            print('172008')
            print('子帐号为空')
        
        if(self.SubAccountToken==""):
            print('172009')
            print('子帐号令牌为空')
        
        if(self.AppId==""):
            print('172012')
            print('应用ID为空')
    
    #主帐号鉴权
    def accAuth(self):
        if(self.ServerIP==""):
            print('172004')
            print('IP为空')
        
        if(self.ServerPort<="0"):
            print('172005')
            print('端口错误(小于等于0)')
        
        if(self.SoftVersion==""):
            print('172013')
            print('版本号为空')
        
        if(self.AccountSid==""):
            print('172006')
            print('主帐号为空')
        
        if(self.AccountToken==""):
            print('172007')
            print('主帐号令牌为空')
        
        if(self.AppId==""):
            print('172012')
            print('应用ID为空')


    #设置包头
    def setHttpHeader(self,req):
        if self.BodyType == 'json':
            req.add_header("Accept", "application/json")
            req.add_header("Content-Type", "application/json;charset=utf-8")
            
        else:
            req.add_header("Accept", "application/xml")
            req.add_header("Content-Type", "application/xml;charset=utf-8")

附原 xmltojson.py

# -*- coding: utf-8 -*-
#python xml.etree.ElementTree

import xml.etree.ElementTree as ET

class xmltojson:
    #global var
    #show log
    SHOW_LOG = True
    #XML file
    XML_PATH = None
    a={}
    m=[]
    
    def get_root(self,path):
        '''parse the XML file,and get the tree of the XML file
        finally,return the root element of the tree.
        if the XML file dose not exist,then print the information'''
        #if os.path.exists(path):
            #if SHOW_LOG:
                #print('start to parse the file : [{}]'.format(path))
        tree = ET.fromstring(path)
        return tree
        #else:
            #print('the path [{}] dose not exist!'.format(path))
    
    def get_element_tag(self,element):
        '''return the element tag if the element is not None.'''
        if element is not None:
            
            return element.tag
        else:
            print('the element is None!')
    
    def get_element_attrib(self,element):
        '''return the element attrib if the element is not None.'''
        if element is not None:
            
            return element.attrib
        else:
            print('the element is None!')
    
    def get_element_text(self,element):
        '''return the text of the element.'''
        if element is not None:
            return element.text
        else:
            print('the element is None!')
    
    def get_element_children(self,element):
        '''return the element children if the element is not None.'''
        if element is not None:
            
            return [c for c in element]
        else:
            print('the element is None!')
    
    def get_elements_tag(self,elements):
        '''return the list of tags of element's tag'''
        if elements is not None:
            tags = []
            for e in elements:
                tags.append(e.tag)
            return tags
        else:
            print('the elements is None!')
    
    def get_elements_attrib(self,elements):
        '''return the list of attribs of element's attrib'''
        if elements is not None:
            attribs = []
            for a in elements:
                attribs.append(a.attrib)
            return attribs
        else:
            print('the elements is None!')
    
    def get_elements_text(self,elements):
        '''return the dict of element'''
        if elements is not None:
            text = []
            for t in elements:
                text.append(t.text)
            return dict(zip(self.get_elements_tag(elements), text))
        else:
            print('the elements is None!')
    
    def main(self,xml):
        #root
        root = self.get_root(xml)
    
        #children
        children = self.get_element_children(root)

        children_tags = self.get_elements_tag(children)

        children_attribs = self.get_elements_attrib(children)

        i=0

        #获取二级元素的每一个子节点的名称和值
        for c in children:
            p=0
            c_children = self.get_element_children(c)
            dict_text = self.get_elements_text(c_children)
            if dict_text :
                #print (children_tags[i])
                if children_tags[i] =='TemplateSMS':
                    self.a['templateSMS']=dict_text
                else :
                    if children_tags[i]=='SubAccount':
                        k=0
                        
                        for x in children:
                            if children_tags[k]=='totalCount':   
                                self.m.append(dict_text)
                                self.a['SubAccount']=self.m
                                p=1
                            k=k+1
                        if p==0:
                            self.a[children_tags[i]]=dict_text
                    else:
                        self.a[children_tags[i]]=dict_text
            else:
                self.a[children_tags[i]]=c.text
            i=i+1
        return self.a
    
    def main2(self,xml):
        #root
        root = self.get_root(xml)
    
        #children
        children = self.get_element_children(root)

        children_tags = self.get_elements_tag(children)

        children_attribs = self.get_elements_attrib(children)

        i=0

        #获取二级元素的每一个子节点的名称和值
        for c in children:
            p=0
            c_children = self.get_element_children(c)
            dict_text = self.get_elements_text(c_children)
            if dict_text :
                if children_tags[i] =='TemplateSMS':
                    k=0
                        
                    for x in children:
                        if children_tags[k]=='totalCount':   
                            self.m.append(dict_text)
                            self.a['TemplateSMS']=self.m
                            p=1
                        k=k+1
                    if p==0:
                        self.a[children_tags[i]]=dict_text
                else:
                    self.a[children_tags[i]]=dict_text
                    
            else:
                self.a[children_tags[i]]=c.text
            i=i+1
        return self.a

 


  • 作者:合十
  • 发表时间:2021年8月29日 14:53
  • 更新时间:2024年9月18日 15:34
  • 所属分类:我用Python

Comments

该文章还未收到评论,点击下方评论框开始评论吧~