【RabbitMQ】RabbitMQ简单用(七)
1、RabbitMQ客户端
com.rabbitmq
amqp-client
5.14.0
2、工具类
public class ConnectUtils {
public static Connection getConnection() throws IOException, TimeoutException {
// 定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("test.rabbitmq.com");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("wsxadmin");
Connection connection = null;
// 通过工厂获取连接
connection = factory.newConnection();
return connection;
}
}
3、简单模式
生产者
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
Connection connection = null;
Channel channel = null;
try {
connection = ConnectUtils.getConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
for(int i =0; i < 100; i++) {
channel.basicPublish("", QUEUE_NAME, null, (message+"=="+i).getBytes());
}
System.out.println(" [x] Sent '" + message + "'");
} finally {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
消费者
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
Connection connection = null;
Channel channel = null;
connection = ConnectUtils.getConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body);
System.out.println("consumer receive msg: " + msg);
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
}
}
4、订阅模式
public class Producer {
private static final String EXCHANGE_NAME = "fanout_exchange";
private static final String EXCHANGE_TYPE = "fanout";
private static final String QUEUE1 = "QUEUE1";
private static final String QUEUE2 = "QUEUE2";
private static final String ROUTING_KEY = "";
public static void main(String[] args) throws IOException, TimeoutException {
// 注意:如果消费没有绑定交换机和队列,则消息会丢失
Connection connection = null;
Channel channel = null;
try {
// 1.创建新的连接
connection = ConnectUtils.getConnection();
// 2.创建通道
channel = connection.createChannel();
// 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
channel.queueBind(QUEUE1, EXCHANGE_NAME, ROUTING_KEY);
channel.queueBind(QUEUE2, EXCHANGE_NAME, ROUTING_KEY);
String msg = "fanout_exchange_msg";
// 4.发送消息
// param1:交换机
// param2:队列名称/routingKey
// param3:属性配置
// param4:消息内容
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes());
} finally {
// 5.关闭通道、连接
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
消费者
public class Consumer implements Runnable {
private String queueName;
private String exchangeName;
public Consumer(String queueName, String exchangeName) {
this.queueName = queueName;
this.exchangeName = exchangeName;
}
@Override
public void run() {
try {
// 1.创建新的连接
Connection connection = ConnectUtils.getConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.消费者关联队列
channel.queueDeclare(queueName, false, false, false, null);
// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
channel.queueBind(queueName, exchangeName, "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println(Thread.currentThread().getName() + "fanout---消费者获取生产者消息:" + msg);
}
};
// 5.消费者监听队列消息
channel.basicConsume(queueName, true, consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Thread(new Consumer("QUEUE1", "fanout_exchange")).start();
new Thread(new Consumer("QUEUE2", "fanout_exchange")).start();
}
}
5、路由模式
public class Producer {
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 注意:如果消费没有绑定交换机和队列,则消息会丢失
Connection connection = null;
Channel channel = null;
try {
// 1.创建新的连接
connection = ConnectUtils.getConnection();
// 2.创建通道
channel = connection.createChannel();
// 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String routingKey = "info";
String msg = "direct_exchange_msg" + routingKey;
// 4.发送消息
// param1:交换机
// param2:队列名称/routingKey
// param3:属性配置
// param4:消息内容
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
} finally {
// 5.关闭通道、连接
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
消费者
public class ConsumerEmailDirect {
private static final String QUEUE_NAME = "consumer_direct_email";
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建新的连接
Connection connection = ConnectUtils.getConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.消费者关联队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("邮件消费者获取生产者消息:" + msg);
}
};
// 5.消费者监听队列消息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
6、主题模式
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.wcw.util.ConnectUtils;
public class Consumer implements Runnable {
private String queueName;
private String exchangeName;
public Consumer(String queueName, String exchangeName) {
this.queueName = queueName;
this.exchangeName = exchangeName;
}
@Override
public void run() {
try {
// 1.创建新的连接
Connection connection = ConnectUtils.getConnection();
// 2.创建通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
// 3.消费者关联队列
channel.queueDeclare(queueName, false, false, false, null);
// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
//#匹配一个词或多个词,*只匹配一个词
channel.queueBind(QUEUE3, EXCHANGE_NAME, "#.orange.*");
channel.queueBind(QUEUE4, EXCHANGE_NAME, "lazy.#");
channel.queueBind(QUEUE4, EXCHANGE_NAME, "*.*.rabbit");
final String qn = this.queueName;
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println(qn + "---消费者获取生产者消息:" + msg);
}
};
// 5.消费者监听队列消息
channel.basicConsume(queueName, true, consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Thread(new Consumer("QUEUE3", "topic_exchange")).start();
new Thread(new Consumer("QUEUE4", "topic_exchange")).start();
}
}
消费者
public class Producer {
protected static final String EXCHANGE_NAME = "topic_exchange";
protected static final String EXCHANGE_TYPE = "topic";
protected static final String QUEUE3 = "QUEUE3";
protected static final String QUEUE4 = "QUEUE4";
public static void main(String[] args) throws IOException, TimeoutException {
// 注意:如果消费没有绑定交换机和队列,则消息会丢失
Connection connection = null;
Channel channel = null;
try {
// 1.创建新的连接
connection = ConnectUtils.getConnection();
// 2.创建通道
channel = connection.createChannel();
// 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
//#匹配一个词或多个词,*只匹配一个词
channel.queueBind(QUEUE3, EXCHANGE_NAME, "#.orange.*");
channel.queueBind(QUEUE4, EXCHANGE_NAME, "lazy.#");
channel.queueBind(QUEUE4, EXCHANGE_NAME, "*.*.rabbit");
String msg = "fanout_exchange_msg";
// 4.发送消息
// param1:交换机
// param2:队列名称/routingKey
// param3:属性配置
// param4:消息内容
channel.basicPublish(EXCHANGE_NAME, "aa.orange.1", null, "33".getBytes());
//无队列接收
channel.basicPublish(EXCHANGE_NAME, "aa.rabbit.1", null, msg.getBytes());
channel.basicPublish(EXCHANGE_NAME, "lazy.orange.1", null, "33-44".getBytes());
channel.basicPublish(EXCHANGE_NAME, "1.2.rabbit", null, "444".getBytes());
} finally {
// 5.关闭通道、连接
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}