mqtt笔记
mqtt中间件官网:
https://www.emqx.io/docs/zh/v4.0/
订阅代码例子:
package org.qn.util.mqtt; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.nio.file.Paths; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Base64; import java.util.Date; import java.util.List; import java.util.Map; import java.util.UUID; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.jeecgframework.core.util.ApplicationContextUtil; import org.jeecgframework.core.util.StringUtil; import org.jeecgframework.web.rest.app.controller.AccessRestController; import org.jeecgframework.web.system.service.SystemService; import org.qn.app.entity.TbLienroomAppEntity; import org.qn.khry.entity.TbAppUserEntity; import org.qn.khry.service.TbAppUserServiceI; import org.qn.log.entity.TbAccessLogEntity; import org.qn.log.entity.TbAccessMainLogEntity; import org.qn.log.service.TbAccessMainLogServiceI; public class MqttUtils implements MqttCallbackExtended{ private static TbAccessMainLogServiceI tbAccessMainLogService = ApplicationContextUtil.getContext().getBean(TbAccessMainLogServiceI.class); private static TbAppUserServiceI tbAppUserService = ApplicationContextUtil.getContext().getBean(TbAppUserServiceI.class); // MQTT服务器地址及端口、连接协议 private static String broker = "tcp://10.211.9.160:1883"; // 10.211.9.160 // MQTT连接用户 private static String username = "admin"; // MQTT连接密码 private static String password = "admin"; // 客户端实例 private MqttClient client; // QoS private int qos; // 发送后是否保留 private boolean retained; // 需要订阅的主题 private ListsubscirbeTopics; // 客户端名称 private String clientName; /** * * @param broker 服务地址,例如:tcp://10.211.9.135:1883 * @param username 登录名 * @param password 登录密码 */ // public MqttUtils(String broker, String username, String password) { // this.clientName = UUID.randomUUID().toString().replaceAll("-", ""); // this.broker = broker; // this.username = username; // this.password = password; // this.subscirbeTopics = new ArrayList<>(); // // 默认为 0 - 至多一次 // this.qos = 0; // // 默认不保留 // this.retained = false; // } /** * 默认地址 */ public MqttUtils() { this.clientName = UUID.randomUUID().toString().replaceAll("-", ""); this.subscirbeTopics = new ArrayList<>(); // 默认为 0 - 至多一次 this.qos = 0; // 默认不保留 this.retained = false; } public void connect() { try { this.client = new MqttClient(broker, clientName); } catch (MqttException e) { e.printStackTrace(); } // MqttConnectOptions options = new MqttConnectOptions(); // options.setUserName(this.username); // options.setPassword(password.toCharArray()); // options.setCleanSession(true); // client.setCallback(this); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(this.username); options.setPassword(password.toCharArray()); options.setCleanSession(true); options.setAutomaticReconnect(true); options.setConnectionTimeout(10); options.setKeepAliveInterval(90); client.setCallback(this); try { client.connect(options); } catch (MqttException e) { e.printStackTrace(); } } /** * * @param topic 消息要发送的主题 * @param content 消息内容 * @param isClosed 消息发送后是否关闭客户端连接 */ public void publish(String topic, String content, boolean isClosed) { if (this.client == null || !client.isConnected()) { System.out.println("客户端连接异常!"); return; } try { client.publish(topic, content.getBytes(), this.qos, this.isRetained()); System.out.printf("消息已发送至主题【%s】!\n", topic); } catch (Exception e) { System.out.println("消息发布异常:" + e.getMessage()); } if (isClosed) { try { client.disconnect(); client.close(); } catch (MqttException e) { System.out.println("资源释放失败:" + e.getMessage()); } } } @Override public void connectionLost(Throwable arg0) { System.out.println("连接已丢失!异常信息:" + arg0.getMessage()); if (client != null) { if (!client.isConnected()) { try { client.reconnect(); System.out.println("服务器正在重连!"); } catch (MqttException e) { e.printStackTrace(); } } } } @Override public void deliveryComplete(IMqttDeliveryToken arg0) { try { System.out.printf("消息#%d已发布至主题【%s】,内容:%s!\n", arg0.getMessageId(), String.join(",", arg0.getTopics()), new String(arg0.getMessage().getPayload())); } catch (MqttException e) { e.printStackTrace(); } } /** * 处理订阅的消息 * * @param arg0 * @param arg1 * @throws Exception */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.printf("接收到来自【%s】的消息,消息长度:%d\n", topic, new String(message.getPayload()).length()); //人脸下发回调 System.out.println("==========message-mqtt========="+message); Map dataMapUpdate =App.dataMapUpdate; if(dataMapUpdate.containsKey(topic)) { //{"messageId":"8a166032b7f9493ca9fc675b2c2e76b1","operator":"EditPerson","info":{"birthday":"","cardValidEnd":"","address":"","cardNum2":"","notes":"","gender":1,"cardValidBegin":"","cardType2":0,"nation":1,"idCard":"522726198907214915","cardType":0,"customId":"522726198907214915","tempCardType":0,"EffectNumber":10,"telnum1":"","native":"","picURI":"http://172.16.20.111:5000/faces/522726198907214915_real.jpg","name":"柏秀华","personId":"1","personType":0}} JSONObject msgObj = JSON.parseObject(new String(message.getPayload())); JSONObject info = msgObj.getJSONObject("info"); String idCard = info.getString("idCard"); String name = info.getString("name"); // mqtt/face/1702872 TbLienroomAppEntity o = dataMapUpdate.get(topic); List tbAppUsers = tbAccessMainLogService.findHql("FROM TbAppUserEntity WHERE appId = ? AND userName = ?", o.getAppIp(),idCard); if(null != tbAppUsers && tbAppUsers.size()>0) { TbAppUserEntity tbAppUser = tbAppUsers.get(0); tbAppUser.setWill1("1"); tbAppUserService.saveOrUpdate(tbAppUser); } } //记录日志开始 Map dataMap =App.dataMap; if(dataMap.containsKey(topic)) { JSONObject msgObj = JSON.parseObject(new String(message.getPayload())); JSONObject info = msgObj.getJSONObject("info"); String idCard = info.getString("idCard"); String persionName = info.getString("persionName"); Date time = info.getDate("time"); TbLienroomAppEntity o = dataMap.get(topic); TbAccessLogEntity log = new TbAccessLogEntity(); log.setUsername(idCard); log.setAccessDate(time); log.setAccess(o.getLienRoomNum()); log.setAccessType(o.getAppPosition()+""); log.setRealname(persionName); log.setReason("开门"); isNewMainLog(log); } } public boolean isNewMainLog(TbAccessLogEntity log) { String hql = "from TbAccessMainLogEntity where username = '"+log.getUsername()+"' and exitDate is null"; List tbAccessLogList = new ArrayList (); tbAccessLogList.add(log); try { //查询在区中主日志 TbAccessMainLogEntity tempMainLog = tbAccessMainLogService.singleResult(hql); if(!StringUtil.isNotEmpty(tempMainLog)) {//生成新的main_log并绑定 tempMainLog = new TbAccessMainLogEntity(); tempMainLog.setEntryAreas(log.getAccess()); tempMainLog.setEntryDate(log.getAccessDate()); tempMainLog.setRealname(log.getRealname()); tempMainLog.setUserKey(log.getUserKey()); tempMainLog.setUsername(log.getUsername()); tbAccessMainLogService.addMain(tempMainLog,tbAccessLogList); }else { if(tempMainLog.getEntryAreas().equals(log.getAccess()) &&"1".equals(log.getAccessType())) { tempMainLog.setExitDate(log.getAccessDate()); } log.setParentId(tempMainLog.getId()); tbAccessMainLogService.saveOrUpdate(tempMainLog); tbAccessMainLogService.save(log); } } catch (Exception e) { e.printStackTrace(); } return false; } @Override public void connectComplete(boolean arg0, String arg1) { if (subscirbeTopics.size() == 0) { System.out.println("没有需要订阅的主题。"); return; } if (client != null && client.isConnected()) { System.out.println("服务器连接成功!开始订阅主题:" + String.join(",", this.subscirbeTopics)); try { // subscirbeTopics.stream().forEach(s -> { // }); client.subscribe(subscirbeTopics.toArray(new String[subscirbeTopics.size()])); } catch (MqttException e) { System.out.println("订阅主题失败:" + e.getMessage()); } } else { System.out.println("客户端尚连接异常!"); } } /** * * @param base64String 保存base64字符串为图片 * @param filepath 图片路径 * @throws IOException */ public static void convertBase64StrToImage(String base64String, String filepath) throws IOException { String flag = "data:image/jpeg;base64,"; if (base64String.contains(flag)) { base64String = base64String.substring(flag.length()); } byte[] imageBytes = Base64.getDecoder().decode(base64String); File file = new File(filepath); File dir = new File(file.getParent()); if (!dir.exists()) { dir.mkdirs(); } if (file.exists()) { file.createNewFile(); } FileOutputStream fos = new FileOutputStream(file); fos.write(imageBytes); fos.flush(); fos.close(); } /** * 把图片转换为Base64方法 * * @param filepath * @return * @throws IOException */ public static String convertImageToBase64Str(String filepath) throws IOException { String flag = "data:image/jpeg;base64,"; File file = new File(filepath); if (!file.exists()) { throw new FileNotFoundException("目标文件不存在!"); } FileInputStream fis = new FileInputStream(file); if (fis.available() > 1 << 20) { fis.close(); throw new IllegalArgumentException("目标图片文件过大!不能超过1M!"); } byte[] buffer = new byte[fis.available()]; fis.read(buffer); fis.close(); return flag + Base64.getEncoder().encodeToString(buffer); } public boolean isRetained() { return retained; } public void setRetained(boolean retained) { this.retained = retained; } public MqttClient getClient() { return client; } public void setClient(MqttClient client) { this.client = client; } public String getBroker() { return broker; } public void setBroker(String broker) { this.broker = broker; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public int getQos() { return qos; } public void setQos(int qos) { this.qos = qos; } public List getSubscirbeTopics() { return subscirbeTopics; } public void setSubscirbeTopics(List subscirbeTopics) { this.subscirbeTopics = subscirbeTopics; } public String getClientName() { return clientName; } public void setClientName(String clientName) { this.clientName = clientName; } }
安装包:
链接:https://pan.baidu.com/s/1WyEwstdhaCC4s5HtU_-e9g
提取码:5n8b
解压程序包
$ unzip emqx-ubuntu18.04-v4.0.0.zip
启动 EMQ X Broker
$ ./bin/emqx start
emqx 4.0.0 is started successfully!
$ ./bin/emqx_ctl status
Node 'emqx@127.0.0.1' is started
emqx v4.0.0 is running
停止 EMQ X Broker
$ ./bin/emqx stop
ok
卸载 EMQ X Broker
直接删除 EMQ X 目录即可
查看 Dashboard
EMQ X Dashboard 是一个 Web 应用程序,你可以直接通过浏览器来访问它,无需安装任何其他软件。
当 EMQ X 成功运行在你的本地计算机上且 EMQ X Dashboard 被默认启用时,你可以访问 http://localhost:18083 来查看你的 Dashboard,默认用户名是 admin
,密码是 public
。