因为 harmonyOS 暂时没有发现现成的 mqtt 的 JS 包, 所以使用 Java 进行 Mqtt 消息的接收, 使用 JS 去定时调用 Java 接收到消息并展示.
想了解更多内容, 请访问:
51CTO 和华为官方战略合作共建的鸿蒙技术社区
https://harmonyos.51cto.com
思路
因为 harmonyOS 暂时没有发现现成的 mqtt 的 JS 包, 所以使用 Java 进行 Mqtt 消息的接收, 使用 JS 去定时调用 Java 接收到消息并展示
首先是 JS 调用 Java,JS FA(Feature Ability) 调用 Java PA(Particle Ability) 有两种方式, Ability 和 Internal Ability, 这里使用的是第一种 Ability
然后是 Java 端的 Mqtt 消息接收, 使用 paho 的第三方库进行消息接收, 页面启动时 JS 端调用 Java 端实现 Mqtt 消息接收开始, 使用异步挂起, 接收消息并缓存, 随后 JS 端每次调用 Java 端拿到的都是最新缓存的信息
具体代码
hml 页面:
{{ title }}
开始 mqtt
停止 mqtt
JS 代码:
- const ABILITY_TYPE_EXTERNAL = 0;
- const ACTION_SYNC = 0;
- const ACTION_MESSAGE_CODE_START_MQTT = 1001;
- const ACTION_MESSAGE_CODE_MQTT_MESSAGE = 1002;
- const BUNDLE_NAME = 'com.example.mqttapplication';
- const ABILITY_NAME = 'com.example.mqttapplication.PlayAbility';
- export const playAbility = {
- startMqtt: async function() {
- FeatureAbility.callAbility({
- messageCode: ACTION_MESSAGE_CODE_START_MQTT,
- abilityType: ABILITY_TYPE_EXTERNAL,
- syncOption: ACTION_SYNC,
- bundleName: BUNDLE_NAME,
- abilityName: ABILITY_NAME
- });
- },
- mqttMessage: async function(that) {
- var result = await FeatureAbility.callAbility({
- messageCode: ACTION_MESSAGE_CODE_MQTT_MESSAGE,
- abilityType: ABILITY_TYPE_EXTERNAL,
- syncOption: ACTION_SYNC,
- bundleName: BUNDLE_NAME,
- abilityName: ABILITY_NAME
- });
- var ret = JSON.parse(result);
- if (ret.code == 0) {
- console.info('mqtt is:' + JSON.stringify(ret.abilityResult));
- that.title = 'mqtt is:' + JSON.stringify(ret.abilityResult);
- } else {
- console.error('mqtt error code:' + JSON.stringify(ret.code));
- }
- }
- }
- export default {
- data: {
- title: "",
- timer: null
- },
- task() {
- playAbility.mqttMessage(this);
- },
- mqttMessage() {
- this.title = "开始获取 MQTT 消息";
- this.task()
- this.timer=setInterval(this.task,200)
- },
- stopMqtt() {
- clearInterval(this.timer)
- }
- }
- // 初始化 Java 端 Mqtt 消息接收
- playAbility.startMqtt()
Java 端代码 (接收 Mqtt 消息, 异步)
- import org.eclipse.paho.client.mqttv3.*;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
- import java.util.List;
- public class MqttThread implements Runnable {
- /** 地址 */
- public static final String MQTT_BROKER_HOST = "tcp://xxx.xxx.xxx.xxx:1883";
- /** 客户端唯一标识 */
- public static final String MQTT_CLIENT_ID = "client";
- /** 订阅标识 */
- public static final String MQTT_TOPIC = "HarmonyTest";
- /** 客户端 */
- private volatile static MqttClient mqttClient;
- /** 连接选项 */
- private static MqttConnectOptions options;
- /** 消息 */
- private final List message;
- public MqttThread(List message) {
- this.message = message;
- }
- public void run() {
- try {
- mqttClient = new MqttClient(MQTT_BROKER_HOST, MQTT_CLIENT_ID, new MemoryPersistence());
- options = new MqttConnectOptions();
- options.setCleanSession(true);
- options.setConnectionTimeout(20);
- options.setKeepAliveInterval(20);
- mqttClient.connect(options);
- mqttClient.subscribe(MQTT_TOPIC);
- mqttClient.setCallback(new MqttCallback() {
- @Override
- public void connectionLost(Throwable throwable) { }
- @Override
- public void messageArrived(String s, MqttMessage mqttMessage) {
- message.clear();
- message.add(mqttMessage.toString());
- System.out.println("接收到 mqtt 消息:" + mqttMessage.toString());
- }
- @Override
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { }
- });
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
Java 端代码 (Particle Ability)
- import com.example.mqttapplication.mqtt.MqttThread;
- import ohos.aafwk.ability.Ability;
- import ohos.aafwk.content.Intent;
- import ohos.hiviewdfx.HiLog;
- import ohos.hiviewdfx.HiLogLabel;
- import ohos.rpc.*;
- import ohos.utils.zson.ZSONObject;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- public class PlayAbility extends Ability {
- static final HiLogLabel label = new HiLogLabel(HiLog.LOG_APP, 1, "MY_TAG");
- private static final int ERROR = -1;
- private static final int SUCCESS = 0;
- private static final int START_MQTT = 1001;
- private static final int MQTT_MESSAGE = 1002;
- @Override
- protected void onStart(Intent intent) {
- super.onStart(intent);
- }
- @Override
- protected IRemoteObject onConnect(Intent intent) {
- super.onConnect(intent);
- PlayRemote remote = new PlayRemote();
- return remote.asObject();
- }
- static class PlayRemote extends RemoteObject implements IRemoteBroker {
- private List message;
- private Thread thread;
- public PlayRemote() {
- super("PlayRemote");
- }
- @Override
- public boolean onRemoteRequest(int code, MessageParcel data, MessageParcel reply, MessageOption option) {
- // 开始 mqtt
- else if (code == START_MQTT) {
- Map result = new HashMap<>();
- result.put("code", SUCCESS);
- result.put("abilityResult", "成功开始 mqtt");
- try {
- message = new ArrayList<>();
- MqttThread mqttThread = new MqttThread(message);
- thread = new Thread(mqttThread);
- thread.start();
- System.out.println("mqtt 启动成功");
- }
- catch (Exception e) {
- result.put("code", ERROR);
- result.put("abilityResult", "启动失败");
- }
- reply.writeString(ZSONObject.toZSONString(result));
- }
- // 获取 mqtt 消息
- else if (code == MQTT_MESSAGE) {
- Map result = new HashMap<>();
- result.put("code", SUCCESS);
- if (message.isEmpty()) {
- result.put("abilityResult", "未接收到 MQTT 消息");
- }
- else {
- ZSONObject zsonObject = ZSONObject.stringToZSON(message.get(0));
- result.put("abilityResult", zsonObject.getString("message"));
- }
- reply.writeString(ZSONObject.toZSONString(result));
- }
- else {
- Map result = new HashMap<>();
- result.put("abilityError", ERROR);
- reply.writeString(ZSONObject.toZSONString(result));
- return false;
- }
- return true;
- }
- @Override
- public IRemoteObject asObject() {
- return this;
- }
- }
- }
另外启动网络连接还需要往 config.JSON 里加点东西获取权限
- {
- ...
- "module": {
- ...
- "reqPermissions": [
- {
- "name": "ohos.permission.GET_NETWORK_INFO"
- },
- {
- "name": "ohos.permission.INTERNET"
- },
- {
- "name": "ohos.permission.SET_NETWORK_INFO"
- },
- {
- "name": "ohos.permission.MANAGE_WIFI_CONNECTION"
- },
- {
- "name": "ohos.permission.SET_WIFI_INFO"
- },
- {
- "name": "ohos.permission.GET_WIFI_INFO"
- }
- ]
- }
- }
最后写了个 python 的脚本用来发送 mqtt 消息, 很简单就一行
- import paho.mqtt.publish as publish
- publish.single('HarmonyTest', '{"message":"BongShakalaka"}', hostname='xxx.xxx.xxx.xxx')
附: mqtt 消息是要有 mqtt 服务器的, 这个就自己搭或者买吧
想了解更多内容, 请访问:
51CTO 和华为官方战略合作共建的鸿蒙技术社区
https://harmonyos.51cto.com
来源: http://developer.51cto.com/art/202108/676145.htm