activeMQ 本地测试


参考博主 搭建~ https://www.cnblogs.com/jaycekon/p/6225058.html

ActiveMQ官网下载地址:http://activemq.apache.org/download.html

我下的是windows版本的

下载解压之后进入D:\config\apache-activemq-5.15.7\bin\win64

双击运行activemq.bat,启动本地MQ服务,

 started说明启动成功。

接下来是代码部分:

生产者:Producer

package com.mqtest;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author Maggie.Hao
 * @date 2018/11/5 14:31
 */
public class Producer{

    private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);

    //ActiveMq 的默认用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    //ActiveMq 的默认登录密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    //ActiveMQ 的链接地址
    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    AtomicInteger count = new AtomicInteger(0);

    //链接工厂
    ConnectionFactory connectionFactory;

    //链接对象
    Connection connection;

    //事务管理
    Session session;

    ThreadLocal threadLocal = new ThreadLocal<>();

    public void init(){
        LOGGER.info("Product init");
        try{
            //创建一个链接工厂
            //            connectionFactory = new ActiveMQConnectionFactory("admin","demo","tcp://127.0.0.1:61616");
            connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
            //从工厂中创建一个链接
            connection = connectionFactory.createConnection();
            //开启链接
            connection.start();
            //创建一个事务(这里通过参数可以设置事务的级别)
            session = connection.createSession(true, Session.SESSION_TRANSACTED);
        }catch (JMSException e){
            LOGGER.error("", e);
        }
    }

    public void sendMessage(String disname){
        try{
            //创建一个消息队列
            Queue queue = session.createQueue(disname);
            //消息生产者
            MessageProducer messageProducer = null;
            if (threadLocal.get() != null){
                messageProducer = threadLocal.get();
            }else{
                messageProducer = session.createProducer(queue);
                threadLocal.set(messageProducer);
            }

            while (true){
                Thread.sleep(1000);
                int num = count.getAndIncrement();
                //创建一条消息
                TextMessage msg;
                msg = session.createTextMessage(Thread.currentThread().getName() + "==Productor:我现在正在生产东西!,count:" + num);
                LOGGER.info("msg:{} + {}", msg, num);
                //发送消息
                messageProducer.send(msg);
                //提交事务
                session.commit();
            }
        }catch (JMSException e){
            LOGGER.error("", e);
        }catch (InterruptedException e){
            LOGGER.error("", e);
        }
    }
}

消费者:Consumer

package com.mqtest;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author Maggie.Hao
 * @date 2018/11/5 14:34
 */
public class Consumer{

    private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    ConnectionFactory connectionFactory;

    Connection connection;

    Session session;

    ThreadLocal threadLocal = new ThreadLocal<>();

    AtomicInteger count = new AtomicInteger();

    public void init(){
        try{
            connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        }catch (JMSException e){
            LOGGER.error("", e);
        }
    }

    public void getMessage(String disname){
        try{
            Queue queue = session.createQueue(disname);
            MessageConsumer consumer = null;

            if (threadLocal.get() != null){
                consumer = threadLocal.get();
            }else{
                consumer = session.createConsumer(queue);
                threadLocal.set(consumer);
            }
            while (true){
                Thread.sleep(1000);
                TextMessage msg = (TextMessage) consumer.receive();
                if (msg != null){
                    msg.acknowledge();
                    LOGGER.info("{}:Consumer:我是消费者,我正在消费Msg:{}----->{}", Thread.currentThread().getName(), msg.getText(), count.getAndIncrement());
                }else{
                    break;
                }
            }
        }catch (JMSException e){
            LOGGER.error("", e);
        }catch (InterruptedException e){
            LOGGER.error("", e);
        }
    }
}

启动生产者:

package com.mqtest;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author Maggie.Hao
 * @date 2018/11/5 14:34
 */
public class TestProducer{

    private static final Logger LOGGER = LoggerFactory.getLogger(TestProducer.class);

    public static void main(String[] args){
        Producer producer = new Producer();
        producer.init();
        TestProducer testMq = new TestProducer();
        try{
            Thread.sleep(1000);
        }catch (InterruptedException e){
            LOGGER.error("", e);
        }
        //Thread 1
        new Thread(testMq.new ProductorMq(producer)).start();
        //Thread 2
        new Thread(testMq.new ProductorMq(producer)).start();
        //Thread 3
        new Thread(testMq.new ProductorMq(producer)).start();
        //Thread 4
        new Thread(testMq.new ProductorMq(producer)).start();
        //Thread 5
        new Thread(testMq.new ProductorMq(producer)).start();
    }

    private class ProductorMq implements Runnable{

        Producer producter;

        public ProductorMq(Producer producter){
            this.producter = producter;
        }

        @Override
        public void run(){
            while (true){
                try{
                    producter.sendMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                }catch (InterruptedException e){
                    LOGGER.error("{}", e);
                }
            }
        }
    }
}

启动消费者:

package com.mqtest;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author Maggie.Hao
 * @date 2018/11/5 15:39
 */
public class TestConsumer{

    private static final Logger LOGGER = LoggerFactory.getLogger(TestConsumer.class);

    public static void main(String[] args){
        Consumer comsumer = new Consumer();
        comsumer.init();
        TestConsumer testConsumer = new TestConsumer();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
    }

    private class ConsumerMq implements Runnable{

        Consumer consumer;

        public ConsumerMq(Consumer consumer){
            this.consumer = consumer;
        }

        @Override
        public void run(){
            while (true){
                try{
                    consumer.getMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                }catch (InterruptedException e){
                    LOGGER.error("", e);
                }
            }
        }
    }
}

控制台输出结果:

可以在   http://127.0.0.1:8161/admin/queues.jsp 查看结果

用户名和密码默认都为:admin 

点击Queues可以看到我们的消息队列信息