emq共享订阅
1 共享订阅
多个客户端订阅了同一个主题,发布者发布主题时,每个客户端都会同时收到这个主题的消息。在客户端集群部署的场景下会出现消息重复处理的问题。
EMQ支持共享订阅,多个客户端订阅了同一个主题,发布者发布主题时,只有其中一个客户端接收到消息。
共享订阅有两种方式:
(1)共享订阅:订阅前缀$queue/
多个客户端订阅了$queue/topic,发布者发布到topic,则只有一个客户端会接收到消息。
(2)分组订阅:订阅前缀$share/
多组客户端订阅了$queue/group1/topic、$queue/group2/topic...,发布者发布到topic,则消息会发布到每个group中,但是每个group中只有一个客户端会接收到消息。
2 Java客户端实现共享订阅
开发时发现,使用eclipse paho java客户端时,无法处理共享订阅。订阅$queue/topic能够订阅成功,并且跟踪代码能看到emq也把消息转发到了客户端,但是客户端丢弃掉了。
解决方法就是重写mqtt的回调函数,实现MqttCallback接口。
实现MqttCallback接口的代码如下:
package com.emqtest.emqtest; import java.util.HashMap; import java.util.Map; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; public class SharedSubCallbackRouter implements MqttCallback { private MaptopicFilterListeners; public SharedSubCallbackRouter(Map topicFilterListeners) { this.topicFilterListeners = topicFilterListeners; } public void addSubscriber(String topicFilter, IMqttMessageListener listener) { if (this.topicFilterListeners == null) { this.topicFilterListeners = new HashMap<>(); } this.topicFilterListeners.put(topicFilter, listener); } @Override public void connectionLost(Throwable cause) { } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { for (Map.Entry listenerEntry : topicFilterListeners.entrySet()) { String topicFilter = listenerEntry.getKey(); if (isMatched(topicFilter, topic)) { listenerEntry.getValue().messageArrived(topic, message); } } } @Override public void deliveryComplete(IMqttDeliveryToken token) { } /** * Paho topic matcher does not work with shared subscription topic filter of emqttd * https://github.com/eclipse/paho.mqtt.java/issues/367#issuecomment-300100385 * * http://emqtt.io/docs/v2/advanced.html#shared-subscription * * @param topicFilter the topicFilter for mqtt * @param topic the topic * @return boolean for matched */ private boolean isMatched(String topicFilter, String topic) { if (topicFilter.startsWith("$queue/")) { topicFilter = topicFilter.replaceFirst("\\$queue/", ""); } else if (topicFilter.startsWith("$share/")) { topicFilter = topicFilter.replaceFirst("\\$share/", ""); topicFilter = topicFilter.substring(topicFilter.indexOf('/')); } return MqttTopic.isMatched(topicFilter, topic); } }
创建emq连接代码如下:
mqttClient = new MqttClient("tcp://localhost:1883", "MqttClient"); mqttClient.connect(); Maplisteners = new HashMap<>(); IMqttMessageListener emqListener = new EmqListener(); listeners.put("$queue/testmqtt", emqListener); mqttClient.setCallback(new SharedSubCallbackRouter(listeners)); mqttClient.subscribe("$queue/testmqtt", new EmqListener());
还要再写一个实现IMqttMessageListener接口的Emq消息处理类:
@Component public class EmqListener implements IMqttMessageListener { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { try { System.out.println("topic: " + topic); } catch (Exception e) { e.printStackTrace(); } } }
参考链接:
1 emq的github上关于这个问题的讨论:
https://github.com/emqx/emqx/issues/921#event-1023359646
2 网上有人给的一个解决方法示例代码:
https://github.com/yogin16/paho-shared-sub-example
3 eclipse paho的github链接:
https://github.com/eclipse/paho.mqtt.java