任务简述
最近因需要用 python 写一个微服务来用 MQTT 给硬件传输图片, 其中 python 用的是 flask 框架, 大概流程如下:
image
协议为:
需要将图片数据封装成多个消息进行传输, 每个消息传输的数据字节数为 1400Byte.
消息 (MQTT Payload) 格式: web 服务器 -------->BASE:
image
反馈: BASE---------> Web 服务器:
image
如果 Web 服务器发送完一个 "数据传输消息" 后, 5S 内没有收到 MQTT"反馈消息" 或者收到的反馈中显示 "数据包不完整", 则重发该 "数据传输消息".
程序流程图
根据上面的协议, 可以得到如下的流程图:
image
推荐一个不错的 Python 编程群: 556370268, 里面都是爱好 Python 编程的小伙伴, 可以在一起学习 Python 最新知识, 一起提升技能, 有问题也能一起解决, 不管是刚学 Python 还是有一定的 Python 基础的小伙伴, 这都是个不错的选择哦.
代码如下:
- # encoding:utf-8
- from flask import Flask, jsonify
- from flask_restful import API, Resource, reqparse
- from PIL import Image
- from io import BytesIO
- import requests
- import os, logging, time
- import paho.mqtt.client as mqtt
- import struct
- from flask_cors import *
- # 日志配置信息
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s (runing by %(funcName)s',
- )
- class Mqtt(object):
- def __init__(self, img_data, size):
- self.MQTTHOST = '*******'
- self.MQTTPORT = "******"
- # 订阅和发送的主题
- self.topic_from_base = 'mqttTestSub'
- self.topic_to_base = 'mqttTestPub'
- self.client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
- self.client = mqtt.Client(self.client_id)
- # 完成链接后的回掉函数
- self.client.on_connect = self.on_connect
- # 图片大小
- self.size = size
- # 用于跳出死循环, 结束任务
- self.finished = None
- # 包的编号
- self.index = 0
- # 将收到的图片数据按大小分成列表
- self.image_data_list = [img_data[x:x + 1400] for x in range(0, self.size, 1400)]
- # 记录发布后的数据, 用于监控时延
- self.pub_time = 0
- self.header_to_base = 0xffffeeee
- self.header_from_base = 0xeeeeffff
- # 功能标识
- self.function_begin = 0x01
- self.function_doing = 0x02
- self.function_finished = 0x03
- # 包的完整和非完整状态
- self.whole_package = 0x01
- self.bad_package = 0x00
- # 头信息的格式, 小端模式
- self.format_to_base = "<Lbhh"
- self.format_from_base = "<Lbhb"
- # 如果重发包时, 用于检查是否重发第一个包
- self.first = True
- # 如果重发包时, 用于检查是否重发最后一个包
- self.last = False
- self.begin_data = 'image.jpg;' + str(self.size)
- # 链接 mqtt 服务器函数
- def on_mqtt_connect(self):
- self.client.connect(self.MQTTHOST, self.MQTTPORT, 60)
- self.client.loop_start()
- # 链接完成后的回调函数
- def on_connect(self, client, userdata, flags, rc):
- logging.info("+++ Connected with result code {} +++".format(str(rc)))
- self.client.subscribe(self.topic_from_base)
- # 订阅函数
- def subscribe(self):
- self.client.subscribe(self.topic_from_base, 1)
- # 消息到来处理函数
- self.client.on_message = self.on_message
- # 接收到信息后的回调函数
- def on_message(self, client, userdata, msg):
- # 如果接受第一个包则不需要重发第一个
- self.first = False
- # 将接受到的包进行解压, 得到一个元组
- base_tuple = struct.unpack(self.format_from_base, msg.payload)
- logging.info("+++ imageData's letgth is {}, base_tupe is {} +++".format(self.size, base_tuple))
- logging.info("+++ package_number is {}, package_status_from_base is {} +++"
- .format(base_tuple[2], base_tuple[3]))
- # 检查接受到信息的头部是否正确
- if base_tuple[0] == self.header_from_base:
- logging.info("+++ function_from_base is {} +++".format(base_tuple[1]))
- # 是否完成传输, 如果完成则退出
- if base_tuple[1] == self.function_finished:
- logging.info("+++ finish work +++")
- self.finished = 1
- self.client.disconnect()
- else:
- # 是否是最后一个包
- if self.index == len(self.image_data_list) - 1:
- self.publish('finished', self.function_finished)
- self.last = True
- logging.info("+++ finished_data_to_base is finished+++")
- else:
- # 如果接收到的包不是 0x03 则进行传送数据
- if base_tuple[1] == self.function_begin or base_tuple[1] == self.function_doing:
- logging.info("+++ package_number is {}, package_status_from_base is {} +++"
- .format(base_tuple[2],base_tuple[3]))
- # 如果数据的反馈中, 包的状态是 1 则继续发下一个包
- if base_tuple[3] == self.whole_package:
- self.publish(self.index, self.function_doing)
- logging.info("+++ data_to_base is finished+++")
- self.index += 1
- # 如果数据的反馈中, 包的状态是 0 则重发数据包
- elif base_tuple[3] == self.bad_package:
- re_package_number = base_tuple[2]
- self.publish(re_package_number-1, self.function_doing)
- logging.info("+++ re_data_to_base is finished+++")
- else:
- logging.info("+++ package_status_from_base is not 0 or 1 +++")
- self.client.disconnect()
- else:
- logging.info("+++ function_identifier is illegal +++")
- self.client.disconnect()
- else:
- logging.info("+++ header_from_base is illegal +++")
- self.client.disconnect()
- # 数据发送函数
- def publish(self, index, fuc):
- # 看是否是最后一个包
- if index == 'finished':
- length = 0
- package_number = 0
- data = b''
- else:
- length = len(self.image_data_list[index])
- package_number = index
- data = self.image_data_list[index]
- # 打包数据头信息
- buffer = struct.pack(
- self.format_to_base,
- self.header_to_base,
- fuc,
- package_number,
- length
- )
- to_base_data = buffer + data
- # mqtt 发送
- self.client.publish(
- self.topic_to_base,
- to_base_data
- )
- self.pub_time = time.time()
- # 发送第一个包函数
- def publish_begin(self):
- buffer = struct.pack(
- self.format_to_base,
- self.header_to_base,
- self.function_begin,
- 0,
- len(self.begin_data.encode('utf-8')),
- )
- begin_data = buffer + self.begin_data.encode('utf-8')
- self.client.publish(self.topic_to_base, begin_data)
- # 控制函数
- def control(self):
- self.on_mqtt_connect()
- self.publish_begin()
- begin_time = time.time()
- self.pub_time = time.time()
- self.subscribe()
- while True:
- time.sleep(1)
- # 超过 5 秒重传
- date = time.time() - self.pub_time
- if date> 5:
- # 是否重传第一个包
- if self.first == True:
- self.publish_begin()
- logging.info('+++ this is timeout first_data +++')
- # 是否重传最后一个包
- elif self.last == True:
- self.publish('finished', self.function_finished)
- logging.info('+++ this is timeout last_data +++')
- else:
- self.publish(self.index-1, self.function_doing)
- logging.info('+++ this is timeout middle_data +++')
- if self.finished == 1:
- logging.info('+++ all works is finished+++')
- break
- print(str(time.time()-begin_time) + 'begin_time - end_time')
- App = Flask(__name__)
- API = API(App)
- CORS(App, supports_credentials=True)
- # 接受参数
- parser = reqparse.RequestParser()
- parser.add_argument('url', help='mqttImage url', location='args', type=str)
- class GetImage(Resource):
- # 得到参数并从图床下载到本地
- def get(self):
- args = parser.parse_args()
- url = args.get('url')
- response = requests.get(url)
- # 获取图片
- image = Image.open(BytesIO(response.content))
- # 存取图片
- add = os.path.join(os.path.abspath(''),'image.jpg')
- image.save(add)
- # 得到图片大小
- size = os.path.getsize(add)
- f = open(add, 'rb')
- imageData = f.read()
- f.close()
- # 进行 mqtt 传输
- mqtt = Mqtt(imageData, size)
- mqtt.control()
- # 删除文件
- os.remove(add)
- logging.info('*** the result of control is {} ***'.format(1))
- return jsonify({
- "imageData": 1
- })
- API.add_resource(GetImage, '/image')
- if __name__ == '__main__':
- App.run(debug=True, host='0.0.0.0')
来源: http://www.jianshu.com/p/39ba9cff63a0