mqtt笔记


mqtt中间件官网:

https://www.emqx.io/docs/zh/v4.0/

 订阅代码例子:

package org.qn.util.mqtt;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.jeecgframework.core.util.ApplicationContextUtil;
import org.jeecgframework.core.util.StringUtil;
import org.jeecgframework.web.rest.app.controller.AccessRestController;
import org.jeecgframework.web.system.service.SystemService;
import org.qn.app.entity.TbLienroomAppEntity;
import org.qn.khry.entity.TbAppUserEntity;
import org.qn.khry.service.TbAppUserServiceI;
import org.qn.log.entity.TbAccessLogEntity;
import org.qn.log.entity.TbAccessMainLogEntity;
import org.qn.log.service.TbAccessMainLogServiceI;

public class MqttUtils implements MqttCallbackExtended{
    private static TbAccessMainLogServiceI tbAccessMainLogService = ApplicationContextUtil.getContext().getBean(TbAccessMainLogServiceI.class);
    private static TbAppUserServiceI tbAppUserService = ApplicationContextUtil.getContext().getBean(TbAppUserServiceI.class);
    // MQTT服务器地址及端口、连接协议
    private static String broker = "tcp://10.211.9.160:1883"; // 10.211.9.160
    // MQTT连接用户
    private static String username = "admin";
    // MQTT连接密码
    private static String password = "admin";
    // 客户端实例
    private MqttClient client;
    // QoS
    private int qos;
    // 发送后是否保留
    private boolean retained;
    // 需要订阅的主题
    private List subscirbeTopics;
    // 客户端名称
    private String clientName;

    /**
     * 
     * @param broker   服务地址,例如:tcp://10.211.9.135:1883
     * @param username 登录名
     * @param password 登录密码
     */
//    public MqttUtils(String broker, String username, String password) {
//        this.clientName = UUID.randomUUID().toString().replaceAll("-", "");
//        this.broker = broker;
//        this.username = username;
//        this.password = password;
//        this.subscirbeTopics = new ArrayList<>();
//        // 默认为 0 - 至多一次
//        this.qos = 0;
//        // 默认不保留
//        this.retained = false;
//    }
    /**
          * 默认地址
     */
    public MqttUtils() {
        this.clientName = UUID.randomUUID().toString().replaceAll("-", "");
        this.subscirbeTopics = new ArrayList<>();
        // 默认为 0 - 至多一次
        this.qos = 0;
        // 默认不保留
        this.retained = false;
    }

    public void connect() {
        try {
            this.client = new MqttClient(broker, clientName);
        } catch (MqttException e) {
            e.printStackTrace();
        }
//        MqttConnectOptions options = new MqttConnectOptions();
//        options.setUserName(this.username);
//        options.setPassword(password.toCharArray());
//        options.setCleanSession(true);
//        client.setCallback(this);
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(this.username);
        options.setPassword(password.toCharArray());
        options.setCleanSession(true);
        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(10);
        options.setKeepAliveInterval(90);
        client.setCallback(this);

        try {
            client.connect(options);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 
     * @param topic    消息要发送的主题
     * @param content  消息内容
     * @param isClosed 消息发送后是否关闭客户端连接
     */
    public void publish(String topic, String content, boolean isClosed) {
        if (this.client == null || !client.isConnected()) {
            System.out.println("客户端连接异常!");
            return;
        }
        try {
            client.publish(topic, content.getBytes(), this.qos, this.isRetained());
            System.out.printf("消息已发送至主题【%s】!\n", topic);
        } catch (Exception e) {
            System.out.println("消息发布异常:" + e.getMessage());
        }
        if (isClosed) {
            try {
                client.disconnect();
                client.close();
            } catch (MqttException e) {
                System.out.println("资源释放失败:" + e.getMessage());
            }
        }
    }

    @Override
    public void connectionLost(Throwable arg0) {
        System.out.println("连接已丢失!异常信息:" + arg0.getMessage());
        if (client != null) {
            if (!client.isConnected()) {
                try {
                    client.reconnect();
                    System.out.println("服务器正在重连!");
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken arg0) {
        try {
            System.out.printf("消息#%d已发布至主题【%s】,内容:%s!\n", arg0.getMessageId(), String.join(",", arg0.getTopics()),
                    new String(arg0.getMessage().getPayload()));
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 处理订阅的消息
     * 
     * @param arg0
     * @param arg1
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.printf("接收到来自【%s】的消息,消息长度:%d\n", topic, new String(message.getPayload()).length());
        //人脸下发回调
        System.out.println("==========message-mqtt========="+message);
        Map dataMapUpdate =App.dataMapUpdate;
        if(dataMapUpdate.containsKey(topic)) { //{"messageId":"8a166032b7f9493ca9fc675b2c2e76b1","operator":"EditPerson","info":{"birthday":"","cardValidEnd":"","address":"","cardNum2":"","notes":"","gender":1,"cardValidBegin":"","cardType2":0,"nation":1,"idCard":"522726198907214915","cardType":0,"customId":"522726198907214915","tempCardType":0,"EffectNumber":10,"telnum1":"","native":"","picURI":"http://172.16.20.111:5000/faces/522726198907214915_real.jpg","name":"柏秀华","personId":"1","personType":0}}
            JSONObject msgObj = JSON.parseObject(new String(message.getPayload()));
            JSONObject info = msgObj.getJSONObject("info");
            String idCard = info.getString("idCard");
            String name = info.getString("name");
            // mqtt/face/1702872
            TbLienroomAppEntity o = dataMapUpdate.get(topic);
            List tbAppUsers = tbAccessMainLogService.findHql("FROM TbAppUserEntity WHERE appId = ? AND userName = ?", o.getAppIp(),idCard);
            if(null != tbAppUsers && tbAppUsers.size()>0) {
                TbAppUserEntity tbAppUser = tbAppUsers.get(0);
                tbAppUser.setWill1("1");
                tbAppUserService.saveOrUpdate(tbAppUser);
            }
        }
        //记录日志开始
        Map dataMap =App.dataMap;
        if(dataMap.containsKey(topic)) {
            JSONObject msgObj = JSON.parseObject(new String(message.getPayload()));
            JSONObject info = msgObj.getJSONObject("info");
            String idCard = info.getString("idCard");
            String persionName = info.getString("persionName");
            Date time = info.getDate("time");
            
            TbLienroomAppEntity o = dataMap.get(topic);
            TbAccessLogEntity log = new TbAccessLogEntity();
            log.setUsername(idCard);
            log.setAccessDate(time);
            log.setAccess(o.getLienRoomNum());
            log.setAccessType(o.getAppPosition()+"");
            log.setRealname(persionName);
            log.setReason("开门");
            isNewMainLog(log);
        }
    }

    public boolean isNewMainLog(TbAccessLogEntity log) {
        String hql = "from TbAccessMainLogEntity where username = '"+log.getUsername()+"' and exitDate is null";
        List tbAccessLogList = new ArrayList();
        tbAccessLogList.add(log);
        try {
            //查询在区中主日志
            TbAccessMainLogEntity tempMainLog = tbAccessMainLogService.singleResult(hql);
            if(!StringUtil.isNotEmpty(tempMainLog)) {//生成新的main_log并绑定
                tempMainLog = new TbAccessMainLogEntity();
                tempMainLog.setEntryAreas(log.getAccess());
                tempMainLog.setEntryDate(log.getAccessDate());
                tempMainLog.setRealname(log.getRealname());
                tempMainLog.setUserKey(log.getUserKey());
                tempMainLog.setUsername(log.getUsername());
                tbAccessMainLogService.addMain(tempMainLog,tbAccessLogList);
            }else {
                if(tempMainLog.getEntryAreas().equals(log.getAccess())
                        &&"1".equals(log.getAccessType())) {
                    tempMainLog.setExitDate(log.getAccessDate());
                }
                log.setParentId(tempMainLog.getId());
                tbAccessMainLogService.saveOrUpdate(tempMainLog);
                tbAccessMainLogService.save(log);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    
    @Override
    public void connectComplete(boolean arg0, String arg1) {
        if (subscirbeTopics.size() == 0) {
            System.out.println("没有需要订阅的主题。");
            return;
        }
        if (client != null && client.isConnected()) {
            System.out.println("服务器连接成功!开始订阅主题:" + String.join(",", this.subscirbeTopics));
            try {
                // subscirbeTopics.stream().forEach(s -> {
                // });
                client.subscribe(subscirbeTopics.toArray(new String[subscirbeTopics.size()]));
            } catch (MqttException e) {
                System.out.println("订阅主题失败:" + e.getMessage());
            }
        } else {
            System.out.println("客户端尚连接异常!");
        }

    }

    /**
     * 
     * @param base64String 保存base64字符串为图片
     * @param filepath     图片路径
     * @throws IOException
     */
    public static void convertBase64StrToImage(String base64String, String filepath) throws IOException {
        String flag = "data:image/jpeg;base64,";
        if (base64String.contains(flag)) {
            base64String = base64String.substring(flag.length());
        }

        byte[] imageBytes = Base64.getDecoder().decode(base64String);
        File file = new File(filepath);
        File dir = new File(file.getParent());

        if (!dir.exists()) {
            dir.mkdirs();
        }

        if (file.exists()) {
            file.createNewFile();
        }

        FileOutputStream fos = new FileOutputStream(file);
        fos.write(imageBytes);
        fos.flush();
        fos.close();
    }

    /**
     * 把图片转换为Base64方法
     * 
     * @param filepath
     * @return
     * @throws IOException
     */
    public static String convertImageToBase64Str(String filepath) throws IOException {
        String flag = "data:image/jpeg;base64,";
        File file = new File(filepath);
        if (!file.exists()) {
            throw new FileNotFoundException("目标文件不存在!");
        }
        FileInputStream fis = new FileInputStream(file);
        if (fis.available() > 1 << 20) {
            fis.close();
            throw new IllegalArgumentException("目标图片文件过大!不能超过1M!");
        }

        byte[] buffer = new byte[fis.available()];
        fis.read(buffer);
        fis.close();

        return flag + Base64.getEncoder().encodeToString(buffer);
    }

    
    
    public boolean isRetained() {
        return retained;
    }

    public void setRetained(boolean retained) {
        this.retained = retained;
    }

    public MqttClient getClient() {
        return client;
    }

    public void setClient(MqttClient client) {
        this.client = client;
    }

    public String getBroker() {
        return broker;
    }

    public void setBroker(String broker) {
        this.broker = broker;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getQos() {
        return qos;
    }

    public void setQos(int qos) {
        this.qos = qos;
    }

    public List getSubscirbeTopics() {
        return subscirbeTopics;
    }

    public void setSubscirbeTopics(List subscirbeTopics) {
        this.subscirbeTopics = subscirbeTopics;
    }

    public String getClientName() {
        return clientName;
    }

    public void setClientName(String clientName) {
        this.clientName = clientName;
    }
    
}

 安装包:

链接:https://pan.baidu.com/s/1WyEwstdhaCC4s5HtU_-e9g
提取码:5n8b

解压程序包

$ unzip emqx-ubuntu18.04-v4.0.0.zip

启动 EMQ X Broker

$ ./bin/emqx start
emqx 4.0.0 is started successfully!

$ ./bin/emqx_ctl status
Node 'emqx@127.0.0.1' is started
emqx v4.0.0 is running
 

停止 EMQ X Broker

$ ./bin/emqx stop
ok

卸载 EMQ X Broker

直接删除 EMQ X 目录即可



查看 Dashboard

EMQ X Dashboard 是一个 Web 应用程序,你可以直接通过浏览器来访问它,无需安装任何其他软件。

当 EMQ X 成功运行在你的本地计算机上且 EMQ X Dashboard 被默认启用时,你可以访问 http://localhost:18083 来查看你的 Dashboard,默认用户名是 admin,密码是 public