activeMQ 本地测试
参考博主 搭建~ https://www.cnblogs.com/jaycekon/p/6225058.html
ActiveMQ官网下载地址:http://activemq.apache.org/download.html
我下的是windows版本的
下载解压之后进入D:\config\apache-activemq-5.15.7\bin\win64
双击运行activemq.bat,启动本地MQ服务,
started说明启动成功。
接下来是代码部分:
生产者:Producer
package com.mqtest; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.*; import java.util.concurrent.atomic.AtomicInteger; /** * @author Maggie.Hao * @date 2018/11/5 14:31 */ public class Producer{ private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); //ActiveMq 的默认用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //ActiveMq 的默认登录密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //ActiveMQ 的链接地址 private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL; AtomicInteger count = new AtomicInteger(0); //链接工厂 ConnectionFactory connectionFactory; //链接对象 Connection connection; //事务管理 Session session; ThreadLocalthreadLocal = new ThreadLocal<>(); public void init(){ LOGGER.info("Product init"); try{ //创建一个链接工厂 // connectionFactory = new ActiveMQConnectionFactory("admin","demo","tcp://127.0.0.1:61616"); connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL); //从工厂中创建一个链接 connection = connectionFactory.createConnection(); //开启链接 connection.start(); //创建一个事务(这里通过参数可以设置事务的级别) session = connection.createSession(true, Session.SESSION_TRANSACTED); }catch (JMSException e){ LOGGER.error("", e); } } public void sendMessage(String disname){ try{ //创建一个消息队列 Queue queue = session.createQueue(disname); //消息生产者 MessageProducer messageProducer = null; if (threadLocal.get() != null){ messageProducer = threadLocal.get(); }else{ messageProducer = session.createProducer(queue); threadLocal.set(messageProducer); } while (true){ Thread.sleep(1000); int num = count.getAndIncrement(); //创建一条消息 TextMessage msg; msg = session.createTextMessage(Thread.currentThread().getName() + "==Productor:我现在正在生产东西!,count:" + num); LOGGER.info("msg:{} + {}", msg, num); //发送消息 messageProducer.send(msg); //提交事务 session.commit(); } }catch (JMSException e){ LOGGER.error("", e); }catch (InterruptedException e){ LOGGER.error("", e); } } }
消费者:Consumer
package com.mqtest; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.*; import java.util.concurrent.atomic.AtomicInteger; /** * @author Maggie.Hao * @date 2018/11/5 14:34 */ public class Consumer{ private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class); private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL; ConnectionFactory connectionFactory; Connection connection; Session session; ThreadLocalthreadLocal = new ThreadLocal<>(); AtomicInteger count = new AtomicInteger(); public void init(){ try{ connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); }catch (JMSException e){ LOGGER.error("", e); } } public void getMessage(String disname){ try{ Queue queue = session.createQueue(disname); MessageConsumer consumer = null; if (threadLocal.get() != null){ consumer = threadLocal.get(); }else{ consumer = session.createConsumer(queue); threadLocal.set(consumer); } while (true){ Thread.sleep(1000); TextMessage msg = (TextMessage) consumer.receive(); if (msg != null){ msg.acknowledge(); LOGGER.info("{}:Consumer:我是消费者,我正在消费Msg:{}----->{}", Thread.currentThread().getName(), msg.getText(), count.getAndIncrement()); }else{ break; } } }catch (JMSException e){ LOGGER.error("", e); }catch (InterruptedException e){ LOGGER.error("", e); } } }
启动生产者:
package com.mqtest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Maggie.Hao * @date 2018/11/5 14:34 */ public class TestProducer{ private static final Logger LOGGER = LoggerFactory.getLogger(TestProducer.class); public static void main(String[] args){ Producer producer = new Producer(); producer.init(); TestProducer testMq = new TestProducer(); try{ Thread.sleep(1000); }catch (InterruptedException e){ LOGGER.error("", e); } //Thread 1 new Thread(testMq.new ProductorMq(producer)).start(); //Thread 2 new Thread(testMq.new ProductorMq(producer)).start(); //Thread 3 new Thread(testMq.new ProductorMq(producer)).start(); //Thread 4 new Thread(testMq.new ProductorMq(producer)).start(); //Thread 5 new Thread(testMq.new ProductorMq(producer)).start(); } private class ProductorMq implements Runnable{ Producer producter; public ProductorMq(Producer producter){ this.producter = producter; } @Override public void run(){ while (true){ try{ producter.sendMessage("Jaycekon-MQ"); Thread.sleep(10000); }catch (InterruptedException e){ LOGGER.error("{}", e); } } } } }
启动消费者:
package com.mqtest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Maggie.Hao * @date 2018/11/5 15:39 */ public class TestConsumer{ private static final Logger LOGGER = LoggerFactory.getLogger(TestConsumer.class); public static void main(String[] args){ Consumer comsumer = new Consumer(); comsumer.init(); TestConsumer testConsumer = new TestConsumer(); new Thread(testConsumer.new ConsumerMq(comsumer)).start(); new Thread(testConsumer.new ConsumerMq(comsumer)).start(); new Thread(testConsumer.new ConsumerMq(comsumer)).start(); new Thread(testConsumer.new ConsumerMq(comsumer)).start(); } private class ConsumerMq implements Runnable{ Consumer consumer; public ConsumerMq(Consumer consumer){ this.consumer = consumer; } @Override public void run(){ while (true){ try{ consumer.getMessage("Jaycekon-MQ"); Thread.sleep(10000); }catch (InterruptedException e){ LOGGER.error("", e); } } } } }
控制台输出结果:
可以在 http://127.0.0.1:8161/admin/queues.jsp 查看结果
用户名和密码默认都为:admin
点击Queues可以看到我们的消息队列信息