一, 背景
目前有在项目分组, 就小组成员中, 微信群消息回复较多的情况下, 想根据组来转发特定消息, 包含文字, 图片, 语言等. 在此只是自己实现仅供参考, 可以根据自身需求修改更多功能.
二, 代码
2.1 企业微信相关信息
企业 ID:corpid
自建应用 appid
自建应用 secret
获取 access_token api https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=ID&corpsecret=SECRECT
发送消息 api https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=ACCESS_TOKEN
2.2 服务端部署
运行环境:
python 版本 2.7
- git clone https://github.com/redhatxl/wechatmsg.git
- nohup python2.7 wechatmsg/wx_msg_server.py &
2.3 参考 RUL:
获取 access_token https://work.weixin.qq.com/api/doc#10013
发送消息 https://work.weixin.qq.com/api/doc#10167
2.4 代码
核心代码
github 地址 https://github.com/redhatxl/wechatmsg
- # flask 框架后台
- def server_run(self):
- app = Flask(__name__)
- @app.route('/index', methods=['GET', 'POST'])
- def index():
- wxcpt = WXBizMsgCrypt(self.sToken, self.sEncodingAESKey, self.sCorpID)
- # 获取 url 验证时微信发送的相关参数
- sVerifyMsgSig = request.args.get('msg_signature')
- sVerifyTimeStamp = request.args.get('timestamp')
- sVerifyNonce = request.args.get('nonce')
- sVerifyEchoStr = request.args.get('echostr')
- # 验证 url
- if request.method == 'GET':
- ret, sEchoStr = wxcpt.VerifyURL(sVerifyMsgSig, sVerifyTimeStamp, sVerifyNonce, sVerifyEchoStr)
- print type(ret)
- print type(sEchoStr)
- if (ret != 0):
- print "ERR: VerifyURL ret:" + str(ret)
- sys.exit(1)
- return sEchoStr
- # 接收客户端消息
- if request.method == 'POST':
- sReqMsgSig = sVerifyMsgSig
- sReqTimeStamp = sVerifyTimeStamp
- sReqNonce = sVerifyNonce
- sReqData = request.data
- print(sReqData)
- ret, sMsg = wxcpt.DecryptMsg(sReqData, sReqMsgSig, sReqTimeStamp, sReqNonce)
- print ret, sMsg
- if (ret != 0):
- print "ERR: DecryptMsg ret:" + str(ret)
- sys.exit(1)
- # 解析发送的内容并打印
- xml_tree = ET.fromstring(sMsg)
- print('xml_tree is', xml_tree)
消息内容发送
- def _send_text_msg(self, content):
- data = {
- "touser": ('|').join(self.userid.split(',')),
- "toparty": ('|').join(self.partid.split(',')),
- # "toparty":int(self.partid),
- "msgtype": "text",
- "agentid": self.agent_id,
- "text": {
- "content": content
- },
- "safe": 0
- }
- try:
- response = requests.post(self.send_msg_url.format(self.access_token), json.dumps(data))
- self.logoper.info(response.text)
- print(response.text)
- result_msg = json.loads(response.content)['errmsg']
- return result_msg
- except Exception as e:
- self.logoper.info(e)
日志
- def create_dir(self):
- """
- 创建目录
- :return: 文件名称
- """
- _LOGDIR = os.path.join(os.path.dirname(__file__), self.logdir_name)
- _TIME = time.strftime('%Y-%m-%d', time.gmtime()) + '-'
- _LOGNAME = _TIME + self.logfile_name
- LOGFILENAME = os.path.join(_LOGDIR, _LOGNAME)
- if not os.path.exists(_LOGDIR):
- os.mkdir(_LOGDIR)
- return LOGFILENAME
- def create_logger(self, logfilename):
- """
- 创建 logger 对象
- :param logfilename:
- :return: logger 对象
- """
- logger = logging.getLogger()
- logger.setLevel(logging.INFO)
- handler = logging.FileHandler(logfilename)
- handler.setLevel(logging.INFO)
- formater = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- handler.setFormatter(formater)
- logger.addHandler(handler)
- return logger
配置文件
- # 定义微信公众号信息
- [common]
- # 企业微信企业 ID
- corpid = wxe23xxxxxxxxxxx
- # 接收消息服务器配置
- [recmsg]
- Token = mVNAAw3xxxxxxxxxxxxxxxxx
- EncodingAESKey = vwbKImxc3WPeE073xxxxxxxxxxxxxxxxxx
- # 自建应用信息
- [appconfig]
- # 自建应用 agentid
- agentid = 1000002
- # 自建应用 secret
- secret = 6HAGX7Muw36pv5anxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
- # 消息接收信息
- # 消息接收用户 id, 如果多个用户用英文','隔开
- userid = xuel|yaoy
- # 消息接收部门 id, 如果多个用英文','隔开
- partid = 11
- [urlconfig]
- # 获取应用 token 的 api 接口
- get_access_token_url = https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={}&corpsecret={}
- # 发送消息 api 接口
- send_msg_url = https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}
- # 上传媒体 api 接口, 获取 mediaid
- upload_media_url = https://qyapi.weixin.qq.com/cgi-bin/media/upload?access_token={}&type=image
- # 上传高清语音接口
- upload_video_url = https://qyapi.weixin.qq.com/cgi-bin/media/get/jssdk?access_token={}&media_id={}
- [loginfo]
- # 日志目录
- logdir_name = logdir
- # 日志文件名称
- logfile_name = wechat_server.log
三, 测试
在企业微信发送消息, 可以修改配置文件制定转发到特定的群组, 从而避免消息分流.
启用应用 API, 设置回调地址
测试发送消息
查看接受消息
四, 优化
后期可以配合数据库将每次获取的 access_token 保存至数据库, 待 2 小时过期后, 再重新获取新的.
更多内容转发
来源: http://blog.51cto.com/kaliarch/2156249