本文主要讨论 RabbitMQ,从3月底接触一个项目使用了 RabbitMQ,就开始着手学习,主要通过视频和博客学习了一个月,基本明白了 RabbitMQ 的应用,其它的 MQ 队列还不清楚,其底层技术还有待学习,以下是我目前的学习心得。
1.安装 Erlang
RabbitMQ 是基于 Erlang 语言写的,所以首先安装 Erlang,本例是在 Windows 上安装,也可以选择在 Linux 上安装,机器上没有虚拟机,直接在 Windows 上操作,建议在 Linux 上安装。官方下载 Erlang 软件,我下载最新版本 21.3。安装过程很简单,直接 Next 到底。 Linux 安装自行谷歌。如下图:
安装结束后,设置环境变量,如下图
测试是否安装成功
2.安装 RabbitMQ
在官方下载,选择最新版本 3.7。安装过程很简单,直接 Next 到底。如下图:
测试安装是否成功,进入安装目录 sbin,执行 rabbitmq-plugins enable rabbitmq_management 命令,出现下面界面,证明安装成功(建议以管理员方式打开 dos)。
执行 rabbitmq-server start 命令,启动服务。本地登陆并创建用户,如下图:
关于tags标签的解释:
1、 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
4.JAVA 操作RabbitMQ
参考 RabbitMQ 官网,一共分为6个模式
RabbitMQ 是一个消息代理,实际上,它接收生产者产生的消息,然后将消息传递给消费者。在这个过程中,它可以路由、缓冲、持久化等,在传输过程中,主要又三部分组成。
生产者:发送消息的一端
一般情况下,消息生产者、消费者和队列不在同一台服务器上,本地做测试,放在一台服务器上。 测试项目直接创建一个 maven 格式的项目,没必要创建网络格式。新建一个项目,如下图:
首先准备操作 MQ 的环境
(1): 准备必要的 Pom 文件,导入相应的 jar 包,
1 <? xml version="1.0" encoding="UTF-8" ?>
2
3 < project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" >
5 < modelVersion > 4.0.0</ modelVersion >
6
7 < groupId > com.edu</ groupId >
8 < artifactId > rabbitmqdemo</ artifactId >
9 < version > 1.0</ version >
10
11 < name > rabbitmqdemo</ name >
12 <!-- FIXME change it to the project's website -->
13 < url > http://www.example.com</ url >
14
15 < properties >
16 < project.build.sourceEncoding > UTF-8</ project.build.sourceEncoding >
17 < maven.compiler.source > 1.7</ maven.compiler.source >
18 < maven.compiler.target > 1.7</ maven.compiler.target >
19 </ properties >
20
21 < dependencies >
22 <!-- 测试包 -->
23 < dependency >
24 < groupId > junit</ groupId >
25 < artifactId > junit</ artifactId >
26 < version > 4.11</ version >
27 < scope > test</ scope >
28 </ dependency >
29 <!-- mq客户端 -->
30 < dependency >
31 < groupId > com.rabbitmq</ groupId >
32 < artifactId > amqp-client</ artifactId >
33 < version > 4.5.0</ version >
34 </ dependency >
35 <!-- 日志 -->
36 < dependency >
37 < groupId > org.slf4j</ groupId >
38 < artifactId > slf4j-log4j12</ artifactId >
39 < version > 1.7.25</ version >
40 </ dependency >
41 <!-- 工具包 -->
42 < dependency >
43 < groupId > org.apache.commons</ groupId >
44 < artifactId > commons-lang3</ artifactId >
45 < version > 3.3.2</ version >
46 </ dependency >
47 <!-- spring集成 -->
48 < dependency >
49 < groupId > org.springframework.amqp</ groupId >
50 < artifactId > spring-rabbit</ artifactId >
51 < version > 1.7.6.RELEASE</ version >
52 </ dependency >
53 < dependency >
54 < groupId > org.springframework</ groupId >
55 < artifactId > spring-test</ artifactId >
56 < version > 4.3.7.RELEASE</ version >
57 </ dependency >
58 < dependency >
59 < groupId > junit</ groupId >
60 < artifactId > junit</ artifactId >
61 < version > RELEASE</ version >
62 < scope > compile</ scope >
63 </ dependency >
64 </ dependencies >
65
66 < build >
67 < pluginManagement > <!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
68 < plugins >
69 <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
70 < plugin >
71 < artifactId > maven-clean-plugin</ artifactId >
72 < version > 3.1.0</ version >
73 </ plugin >
74 <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
75 < plugin >
76 < artifactId > maven-resources-plugin</ artifactId >
77 < version > 3.0.2</ version >
78 </ plugin >
79 < plugin >
80 < artifactId > maven-compiler-plugin</ artifactId >
81 < version > 3.8.0</ version >
82 </ plugin >
83 < plugin >
84 < artifactId > maven-surefire-plugin</ artifactId >
85 < version > 2.22.1</ version >
86 </ plugin >
87 < plugin >
88 < artifactId > maven-jar-plugin</ artifactId >
89 < version > 3.0.2</ version >
90 </ plugin >
91 < plugin >
92 < artifactId > maven-install-plugin</ artifactId >
93 < version > 2.5.2</ version >
94 </ plugin >
95 < plugin >
96 < artifactId > maven-deploy-plugin</ artifactId >
97 < version > 2.8.2</ version >
98 </ plugin >
99 <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
100 < plugin >
101 < artifactId > maven-site-plugin</ artifactId >
102 < version > 3.7.1</ version >
103 </ plugin >
104 < plugin >
105 < artifactId > maven-project-info-reports-plugin</ artifactId >
106 < version > 3.0.0</ version >
107 </ plugin >
108 </ plugins >
109 </ pluginManagement >
110 </ build >
111 </ project >
(2): 建立日志配置文件,在 resources 下建立 log4j.properties,便于打印精确的日志信息
1 log4j.rootLogger=DEBUG,A1
2 log4j.logger.com.edu=DEBUG
3 log4j.logger.org.mybatis=DEBUG
4 log4j.appender.A1=org.apache.log4j.ConsoleAppender
5 log4j.appender.A1.layout=org.apache.log4j.PatternLayout
6 log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-%m%n
(3): 编写一个工具类,主要用于连接 RabbitMQ
1 package com.edu.util;
2
3
4 import com.rabbitmq.client.Connection;
5 import com.rabbitmq.client.ConnectionFactory;
6
7 /**
8 * @ClassName ConnectionUtil
9 * @Deccription 穿件连接的工具类
10 * @Author DZ
11 * @Date 2019/5/4 12:27
12 * */
13 public class ConnectionUtil {
14 /**
15 * 创建连接工具
16 *
17 * @return
18 * @throws Exception
19 */
20 public static Connection getConnection() throws Exception {
21 ConnectionFactory connectionFactory = new ConnectionFactory();
22 connectionFactory.setHost("127.0.0.1");// MQ的服务器
23 connectionFactory.setPort(5672);// 默认端口号
24 connectionFactory.setUsername("test");
25 connectionFactory.setPassword("test");
26 connectionFactory.setVirtualHost("/test");
27 return connectionFactory.newConnection();
28 }
29 }
项目总体图如下:
4.1.Hello World模式
此模式非常简单,一个生产者对应一个消费者
首先我们制造一个消息生产者,并发送消息:
1 package com.edu.hello;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.Channel;
5 import com.rabbitmq.client.Connection;
6
7 /**
8 * @ClassName Sender
9 * @Deccription 创建发送者
10 * @Author DZ
11 * @Date 2019/5/4 12:45
12 * */
13 public class Sender {
14 private final static String QUEUE = "testhello"; // 队列的名字
15
16 public static void main(String[] srgs) throws Exception {
17 // 获取连接
18 Connection connection = ConnectionUtil.getConnection();
19 // 创建连接
20 Channel channel = connection.createChannel();
21 // 声明队列
22 // 参数1:队列的名字
23 // 参数2:是否持久化队列,我们的队列存在内存中,如果mq重启则丢失。如果为ture,则保存在erlang的数据库中,重启,依旧保存
24 // 参数3:是否排外,我们连接关闭后是否自动删除队列,是否私有当前队列,如果私有,其他队列不能访问
25 // 参数4:是否自动删除
26 // 参数5:我们传入的其他参数
27 channel.queueDeclare(QUEUE, false , false , false , null );
28 // 发送内容
29 channel.basicPublish("", QUEUE, null , "要发送的消息".getBytes());
30 // 关闭连接
31 channel.close();
32 connection.close();
33 }
34 }
定义一个消息接受者
1 package com.edu.hello;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.Channel;
5 import com.rabbitmq.client.Connection;
6 import com.rabbitmq.client.QueueingConsumer;
7
8 /**
9 * @ClassName Recver
10 * @Deccription 消息接受者
11 * @Author DZ
12 * @Date 2019/5/4 12:58
13 * */
14 public class Recver {
15 private final static String QUEUE = "testhello";// 消息队列的名称
16
17 public static void main(String[] args) throws Exception {
18 Connection connection = ConnectionUtil.getConnection();
19 Channel channel = connection.createChannel();
20 channel.queueDeclare(QUEUE, false , false , false , null );
21 QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
22 // 接受消息,参数2表示自动确认消息
23 channel.basicConsume(QUEUE, true , queueingConsumer);
24 while (true ) {
25 // 获取消息
26 QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();// 如果没有消息就等待,有消息就获取消息,并销毁,是一次性的
27 String message = new String(delivery.getBody());
28 System.out.println(message);
29 }
30 }
31 }
此种模式属于“点对点”模式,一个生产者、一个队列、一个消费者,可以运用在聊天室(实际上真实的聊天室比这复杂很多,虽然是“点对点”模式,但是并不是一个生产者,一个队列,一个消费者)
4.2.work queues
一个生产者对应多个消费者,但是只有一个消费者获得消息
定义消息制造者:
1 package com.edu.work;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.Channel;
5 import com.rabbitmq.client.Connection;
6
7 /**
8 * @ClassName Sender
9 * @Deccription 创建发送者
10 * @Author DZ
11 * @Date 2019/5/4 12:45
12 * */
13 public class Sender {
14 private final static String QUEUE = "testhellowork"; // 队列的名字
15
16 public static void main(String[] srgs) throws Exception {
17 // 获取连接
18 Connection connection = ConnectionUtil.getConnection();
19 // 创建连接
20 Channel channel = connection.createChannel();
21 // 声明队列
22 // 参数1:队列的名字
23 // 参数2:是否持久化队列,我们的队列存在内存中,如果mq重启则丢失。如果为ture,则保存在erlang的数据库中,重启,依旧保存
24 // 参数3:是否排外,我们连接关闭后是否自动删除队列,是否私有当前队列,如果私有,其他队列不能访问
25 // 参数4:是否自动删除
26 // 参数5:我们传入的其他参数
27 channel.queueDeclare(QUEUE, false , false , false , null );
28 // 发送内容
29 for (int i = 0; i < 100; i++) {
30 channel.basicPublish("", QUEUE, null , ("要发送的消息" + i).getBytes());
31 }
32 // 关闭连接
33 channel.close();
34 connection.close();
35 }
36 }
定义2个消息消费者
1 package com.edu.work;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.*;
5
6 import java.io.IOException;
7 import java.util.Queue;
8
9 /**
10 * @ClassName Recver1
11 * @Deccription 消息接受者
12 * @Author DZ
13 * @Date 2019/5/4 12:58
14 * */
15 public class Recver1 {
16 private final static String QUEUE = "testhellowork";// 消息队列的名称
17
18 public static void main(String[] args) throws Exception {
19 Connection connection = ConnectionUtil.getConnection();
20 final Channel channel = connection.createChannel();
21 channel.queueDeclare(QUEUE, false , false , false , null );
22 // channel.basicQos(1); // 告诉服务器,当前消息没有确认之前,不要发送新消息,合理自动分配资源
23 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
24 @Override
25 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException {
26 // 收到消息时候调用
27 System.out.println("消费者1收到的消息:" + new String(body));
28 /* super.handleDelivery(consumerTag, envelope, properties, body); */
29 // 确认消息
30 // 参数2:false为确认收到消息,ture为拒绝收到消息
31 channel.basicAck(envelope.getDeliveryTag(), false );
32 }
33 };
34 // 注册消费者
35 // 参数2:手动确认,我们收到消息后,需要手动确认,告诉服务器,我们收到消息了
36 channel.basicConsume(QUEUE, false , defaultConsumer);
37 }
38 }
1 package com.edu.work;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.*;
5
6 import java.io.IOException;
7
8 /**
9 * @ClassName Recver1
10 * @Deccription 消息接受者
11 * @Author DZ
12 * @Date 2019/5/4 12:58
13 * */
14 public class Recver2 {
15 private final static String QUEUE = "testhellowork";// 消息队列的名称
16
17 public static void main(String[] args) throws Exception {
18 Connection connection = ConnectionUtil.getConnection();
19 final Channel channel = connection.createChannel();
20 channel.queueDeclare(QUEUE, false , false , false , null );
21 // channel.basicQos(1);
22 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
23 @Override
24 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException {
25 // 收到消息时候调用
26 System.out.println("消费者2收到的消息:" + new String(body));
27 /* super.handleDelivery(consumerTag, envelope, properties, body); */
28 // 确认消息
29 // 参数2:false为确认收到消息,ture为拒绝收到消息
30 channel.basicAck(envelope.getDeliveryTag(), false );
31 }
32 };
33 // 注册消费者
34 // 参数2:手动确认,我们收到消息后,需要手动确认,告诉服务器,我们收到消息了
35 channel.basicConsume(QUEUE, false , defaultConsumer);
36 }
37 }
这种模式是最简单的 work 模式,消息发送者,循环发送了100次消息,打印结果如下:
可以看出,消息消费者消费到的消息是替换的,即一个消息只被消费了一次,且两个消费者各消费了50条消息。这里有个弊端,消息消费者发布消息的时候,无论消费者的消费能力如何(电脑的内存等硬件),消息只会均匀分布给各个消费者(可以给2个消费者 sleep 下,结果还是这样)。有没有什么方式可以让消息自动分配(按照电脑的硬件,能者多劳),答案是可以的,只需要增加 channel.basicQos(1);
此方案可以用来进行负载均衡,抢红包等场景
4.3.public模式
一个消费者将消息首先发送到交换器,交换器绑定到多个队列,然后被监听该队列的消费者所接收并消费。X 表示交换器,在 RabbitMQ 中,交换器主要有四种类型: direct、fanout、topic、headers,这里的交换器是 fanout,其它类型的交换机自行谷歌,主要区别是交换机的匹配方式发生了变化。
定义消息发布者
1 package com.edu.publish;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.Channel;
5 import com.rabbitmq.client.Connection;
6
7 /**
8 * @ClassName Sender
9 * @Deccription TODO
10 * @Author DZ
11 * @Date 2019/5/4 14:43
12 * */
13 public class Sender {
14 private final static String EXCHANGE_NAME = "testexchange";// 定义交换机名字
15
16 public static void main(String[] args) throws Exception {
17 Connection connection = ConnectionUtil.getConnection();
18 Channel channel = connection.createChannel();
19 // 声明交换机
20 // 定义一个交换机,类型为fanout,也就是发布订阅者模式
21 channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
22 // 发布订阅模式,因为消息是先发布到交换机中,而交换机是没有保存功能的,所以如果没有消费者,消息会丢失
23 channel.basicPublish(EXCHANGE_NAME, "", null , "发布订阅模式的消息".getBytes());
24 channel.close();
25 connection.close();
26 }
27 }
定义2个消息消费者
1 package com.edu.publish;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.*;
5
6 import java.io.IOException;
7
8 /**
9 * @ClassName Recver1
10 * @Deccription TODO
11 * @Author DZ
12 * @Date 2019/5/4 14:49
13 * */
14 public class Recver1 {
15 // 定义交换机
16 private final static String EXCHANGE_NAME = "testexchange";
17 private final static String QUEUE = "testpubqueue1";
18
19 public static void main(String[] args) throws Exception {
20 Connection connection = ConnectionUtil.getConnection();
21 final Channel channel = connection.createChannel();
22 channel.queueDeclare(QUEUE, false , false , false , null );
23 // 绑定队列到交换机
24 channel.queueBind(QUEUE, EXCHANGE_NAME, "");
25 channel.basicQos(1);
26 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
27 @Override
28 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException {
29 /* super.handleDelivery(consumerTag, envelope, properties, body); */
30 System.out.println("消费者1:" + new String(body));
31 channel.basicAck(envelope.getDeliveryTag(), false );
32 }
33 };
34 channel.basicConsume(QUEUE, false , defaultConsumer);
35 }
36 }
1 package com.edu.publish;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.*;
5
6 import java.io.IOException;
7
8 /**
9 * @ClassName Recver1
10 * @Deccription TODO
11 * @Author DZ
12 * @Date 2019/5/4 14:49
13 * */
14 public class Recver2 {
15 // 定义交换机
16 private final static String EXCHANGE_NAME = "testexchange";
17 private final static String QUEUE = "testpubqueue2";
18
19 public static void main(String[] args) throws Exception {
20 Connection connection = ConnectionUtil.getConnection();
21 final Channel channel = connection.createChannel();
22 channel.queueDeclare(QUEUE, false , false , false , null );
23 // 绑定队列到交换机
24 channel.queueBind(QUEUE, EXCHANGE_NAME, "");
25 channel.basicQos(1);
26 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
27 @Override
28 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException {
29 /* super.handleDelivery(consumerTag, envelope, properties, body); */
30 System.out.println("消费者2:" + new String(body));
31 channel.basicAck(envelope.getDeliveryTag(), false );
32 }
33 };
34 channel.basicConsume(QUEUE, false , defaultConsumer);
35 }
36 }
消费者1 和消费者2 都监听了被同一个交换器绑定的队列,因此消息被同时消费到了。如果消息发送到没有队列绑定的交换器时,消息将丢失,因为交换器没有存储消息的能力,消息只能存储在队列中。
应用场景:比如一个商城系统需要在管理员上传商品新的图片时,前台系统必须更新图片,日志系统必须记录相应的日志,那么就可以将两个队列绑定到图片上传交换器上,一个用于前台系统更新图片,另一个用于日志系统记录日志。
4.4.routing
" alt="" data-src="https://user-gold-cdn.xitu.io/2019/5/4/16a8283b10501856?imageView2/0/w/1280/h/960/format/webp/ignore-error/1" data-width="478" data-height="177" />生产者将消息发送到 direct 交换器,在绑定队列和交换器的时候有一个路由 key,生产者发送的消息会指定一个路由 key,那么消息只会发送到相应 key 相同的队列,接着监听该队列的消费者消费消息。
定义消息发布者
1 package com.edu.route;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.Channel;
5 import com.rabbitmq.client.Connection;
6
7 /**
8 * @ClassName Sender
9 * @Deccription TODO
10 * @Author DZ
11 * @Date 2019/5/4 15:05
12 * */
13 public class Sender {
14 private final static String EXCANGE_NAME = "testroute";
15
16 public static void main(String[] args) throws Exception {
17 Connection connection = ConnectionUtil.getConnection();
18 Channel channel = connection.createChannel();
19 // 定义路由格式的交换机
20 channel.exchangeDeclare(EXCANGE_NAME, "direct");
21 channel.basicPublish(EXCANGE_NAME, "key2", null , "路由模式的消息".getBytes());
22 channel.close();
23 connection.close();
24 }
25 }
1 package com.edu.route;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.*;
5
6 import java.io.IOException;
7
8 /**
9 * @ClassName Recver1
10 * @Deccription TODO
11 * @Author DZ
12 * @Date 2019/5/4 14:49
13 * */
14 public class Recver1 {
15 // 定义交换机
16 private final static String EXCHANGE_NAME = "testroute";
17 private final static String QUEUE = "testroute1queue";
18
19 public static void main(String[] args) throws Exception {
20 Connection connection = ConnectionUtil.getConnection();
21 final Channel channel = connection.createChannel();
22 channel.queueDeclare(QUEUE, false , false , false , null );
23 // 绑定队列到交换机
24 // 参数3:绑定到交换机指定的路由的名字
25 channel.queueBind(QUEUE, EXCHANGE_NAME, "key1");
26 // 如果需要绑定多个路由,再绑定一次即可
27 channel.queueBind(QUEUE, EXCHANGE_NAME, "key2");
28 channel.basicQos(1);
29 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
30 @Override
31 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException {
32 /* super.handleDelivery(consumerTag, envelope, properties, body); */
33 System.out.println("消费者1:" + new String(body));
34 channel.basicAck(envelope.getDeliveryTag(), false );
35 }
36 };
37 channel.basicConsume(QUEUE, false , defaultConsumer);
38 }
39 }
1 package com.edu.route;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.*;
5
6 import java.io.IOException;
7
8 /**
9 * @ClassName Recver1
10 * @Deccription TODO
11 * @Author DZ
12 * @Date 2019/5/4 14:49
13 * */
14 public class Recver2 {
15 // 定义交换机
16 private final static String EXCHANGE_NAME = "testroute";
17 private final static String QUEUE = "testroute2queue";
18
19 public static void main(String[] args) throws Exception {
20 Connection connection = ConnectionUtil.getConnection();
21 final Channel channel = connection.createChannel();
22 channel.queueDeclare(QUEUE, false , false , false , null );
23 // 绑定队列到交换机
24 // 参数3:绑定到交换机指定的路由的名字
25 channel.queueBind(QUEUE, EXCHANGE_NAME, "key1");
26 // 如果需要绑定多个路由,再绑定一次即可
27 channel.queueBind(QUEUE, EXCHANGE_NAME, "key3");
28 channel.basicQos(1);
29 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
30 @Override
31 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException {
32 /* super.handleDelivery(consumerTag, envelope, properties, body); */
33 System.out.println("消费者2:" + new String(body));
34 channel.basicAck(envelope.getDeliveryTag(), false );
35 }
36 };
37 channel.basicConsume(QUEUE, false , defaultConsumer);
38 }
39 }
应用场景:利用消费者能够有选择性的接收消息的特性,比如我们商城系统的后台管理系统对于商品进行修改、删除、新增操作都需要更新前台系统的界面展示,而查询操作确不需要,那么这两个队列分开接收消息就比较好。
4.5.Topic
上面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配。符号 “#” 表示匹配一个或多个词,符号 “*” 表示匹配一个词。实际上 Topic 模式是 routing 模式的扩展
1 package com.edu.topic;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.Channel;
5 import com.rabbitmq.client.Connection;
6
7 /**
8 * @ClassName Sender
9 * @Deccription TODO
10 * @Author DZ
11 * @Date 2019/5/4 15:19
12 * */
13 public class Sender {
14 private final static String EXCANGE_NAME = "testtopexchange";
15
16 public static void main(String[] args) throws Exception {
17 Connection connection = ConnectionUtil.getConnection();
18 Channel channel = connection.createChannel();
19 channel.exchangeDeclare(EXCANGE_NAME, "topic");
20 channel.basicPublish(EXCANGE_NAME, "abc.adb.1", null , "topic模式消息发送者:".getBytes());
21 channel.close();
22 connection.close();
23 }
24 }
1 package com.edu.topic;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.*;
5
6 import java.io.IOException;
7
8 /**
9 * @ClassName Recver1
10 * @Deccription TODO
11 * @Author DZ
12 * @Date 2019/5/4 14:49
13 * */
14 public class Recver1 {
15 // 定义交换机
16 private final static String EXCHANGE_NAME = "testtopexchange";
17 private final static String QUEUE = "testtopic1queue";
18
19 public static void main(String[] args) throws Exception {
20 Connection connection = ConnectionUtil.getConnection();
21 final Channel channel = connection.createChannel();
22 channel.queueDeclare(QUEUE, false , false , false , null );
23 // 绑定队列到交换机
24 // 参数3:绑定到交换机指定的路由的名字
25 channel.queueBind(QUEUE, EXCHANGE_NAME, "key.*");
26 // 如果需要绑定多个路由,再绑定一次即可
27 channel.queueBind(QUEUE, EXCHANGE_NAME, "abc.*");
28 channel.basicQos(1);
29 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
30 @Override
31 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException {
32 /* super.handleDelivery(consumerTag, envelope, properties, body); */
33 System.out.println("消费者1:" + new String(body));
34 channel.basicAck(envelope.getDeliveryTag(), false );
35 }
36 };
37 channel.basicConsume(QUEUE, false , defaultConsumer);
38 }
39 }
1 package com.edu.topic;
2
3 import com.edu.util.ConnectionUtil;
4 import com.rabbitmq.client.*;
5
6 import java.io.IOException;
7
8 /**
9 * @ClassName Recver1
10 * @Deccription TODO
11 * @Author DZ
12 * @Date 2019/5/4 14:49
13 * */
14 public class Recver2 {
15 // 定义交换机
16 private final static String EXCHANGE_NAME = "testtopexchange";
17 private final static String QUEUE = "testtopic2queue";
18
19 public static void main(String[] args) throws Exception {
20 Connection connection = ConnectionUtil.getConnection();
21 final Channel channel = connection.createChannel();
22 channel.queueDeclare(QUEUE, false , false , false , null );
23 // 绑定队列到交换机
24 // 参数3:绑定到交换机指定的路由的名字
25 channel.queueBind(QUEUE, EXCHANGE_NAME, "key.*");
26 // 如果需要绑定多个路由,再绑定一次即可
27 channel.queueBind(QUEUE, EXCHANGE_NAME, "abc.#");
28 channel.basicQos(1);
29 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
30 @Override
31 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException {
32 /* super.handleDelivery(consumerTag, envelope, properties, body); */
33 System.out.println("消费者2:" + new String(body));
34 channel.basicAck(envelope.getDeliveryTag(), false );
35 }
36 };
37 channel.basicConsume(QUEUE, false , defaultConsumer);
38 }
39 }
第六种模式是将上述的模式集成其它的框架,进行远程访问,这里我们将集成 Spring 实现 RCP 远程模式的使用
5.Spring 集成 RabbitMQ
5.1.自动集成 Spring
编写spring的配置,此配置文件的目的是将 Spring 与 RabbitMQ 进行整合,实际上就是将 MQ 的相关信息(连接,队列,交换机……)通过XML配置的方式实现
1 < beans xmlns ="http://www.springframework.org/schema/beans"
2 xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance"
3 xmlns:rabbit ="http://www.springframework.org/schema/rabbit"
4 xsi:schemaLocation ="http://www.springframework.org/schema/rabbit
5 http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd
6 http://www.springframework.org/schema/beans
7 http://www.springframework.org/schema/beans/spring-beans-4.3.xsd" >
8 <!-- 定义连接工厂 -->
9 < rabbit:connection-factory id ="connectionFactory" host ="127.0.0.1" port ="5672" username ="test" password ="test"
10 virtual-host ="/test" />
11 <!--
12 定义模板
13 第三个参数,决定消息发送到哪里,如果为exchange,则发送到交换机;如果为queue,则发送到队列
14 -->
15 < rabbit:template id ="template" connection-factory ="connectionFactory" exchange ="fanoutExchange" />
16 < rabbit:admin connection-factory ="connectionFactory" />
17 <!-- 定义队列 -->
18 < rabbit:queue name ="myQueue" auto-declare ="true" />
19 <!-- 定义交换机 -->
20 < rabbit:fanout-exchange name ="fanoutExchange" auto-declare ="true" >
21 <!-- 将消息绑定到交换机 -->
22 < rabbit:bindings >
23 < rabbit:binding queue ="myQueue" >
24
25 </ rabbit:binding >
26 </ rabbit:bindings >
27 </ rabbit:fanout-exchange >
28 <!-- 定义监听器,收到消息会执行 -->
29 < rabbit:listener-container connection-factory ="connectionFactory" >
30 <!-- 定义监听的类和方法 -->
31 < rabbit:listener ref ="consumer" method ="test" queue-names ="myQueue" />
32 </ rabbit:listener-container >
33 <!-- 定义消费者 -->
34 < bean id ="consumer" class ="com.edu.spring.MyConsumer" />
35
36 </ beans >
生产者:
1 package com.edu.spring;
2
3 import org.springframework.amqp.rabbit.core.RabbitTemplate;
4 import org.springframework.context.ApplicationContext;
5 import org.springframework.context.support.ClassPathXmlApplicationContext;
6
7 /**
8 * @ClassName SpringTest
9 * @Deccription TODO
10 * @Author DZ
11 * @Date 2019/5/4 18:40
12 * */
13 public class SpringTest {
14 public static void main(String[] args) throws Exception {
15 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
16 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class );
17 rabbitTemplate.convertAndSend("Spring的消息");
18 ((ClassPathXmlApplicationContext) applicationContext).destroy();
19 }
20 }
消费者
1 package com.edu.spring;
2
3 /**
4 * @ClassName MyConsumer
5 * @Deccription TODO
6 * @Author DZ
7 * @Date 2019/5/4 18:35
8 * */
9 public class MyConsumer {
10 /* 用于接收消息 */
11 public void test(String message) {
12 System.err.println(message);
13 }
14 }
集成Spring主要是在xml中实现了队列和交换机的创建。
最好能理解上面的图。理解后,以后写相关的代码,直接去网上 copy 一份配置文件,然后根据自己项目的情况进行修改。如果不能理解,就不知道如何修改出现错误后不知道错误出现在什么地方。
5.2.手动模式
手动模式,主要增加MQ的回调操作,MQ消息失败或者成功就有相应的回调信息,增强系统的健壮性,一旦产生异常,很快就能定位到异常的位置,所以在实际开发中,一般都这种方式
创建xml配置文件
1 < beans xmlns ="http://www.springframework.org/schema/beans"
2 xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance"
3 xmlns:rabbit ="http://www.springframework.org/schema/rabbit"
4 xmlns:context ="http://www.springframework.org/schema/context"
5 xsi:schemaLocation ="http://www.springframework.org/schema/rabbit
6 http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd
7 http://www.springframework.org/schema/beans
8 http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
9 http://www.springframework.org/schema/context
10 http://www.springframework.org/schema/context/spring-context-4.3.xsd" >
11 < context:component-scan base-package ="com.edu.spring2" />
12 < bean id ="jsonMessageConverter" class ="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
13
14 <!--
15 定义连接工厂
16 publisher-confirms为ture,确认失败等回调才会执行
17 -->
18 < rabbit:connection-factory id ="connectionFactory" host ="127.0.0.1" port ="5672" username ="test" password ="test"
19 virtual-host ="/test" publisher-confirms ="true" />
20
21 < rabbit:admin connection-factory ="connectionFactory" />
22 < rabbit:template id ="amqpTemplate" connection-factory ="connectionFactory" confirm-callback ="confirmCallBackListener"
23 return-callback ="returnCallBackListener"
24 mandatory ="true" />
25 <!-- 定义队列 -->
26 < rabbit:queue name ="myQueue" auto-declare ="true" />
27 <!-- 定义交换机 -->
28 < rabbit:direct-exchange name ="DIRECT_EX" id ="DIRECT_EX" >
29 <!-- 将消息绑定到交换机 -->
30 < rabbit:bindings >
31 < rabbit:binding queue ="myQueue" >
32
33 </ rabbit:binding >
34 </ rabbit:bindings >
35 </ rabbit:direct-exchange >
36 <!-- 定义监听器,收到消息会执行 -->
37 < rabbit:listener-container connection-factory ="connectionFactory" acknowledge ="manual" >
38 <!-- 定义监听的类和方法 -->
39 < rabbit:listener queues ="myQueue" ref ="receiveConfirmTestListener" />
40 </ rabbit:listener-container >
41
42 </ beans >
创建回调监听函数
1 package com.edu.spring2;
2
3 import org.springframework.amqp.rabbit.core.RabbitTemplate;
4 import org.springframework.amqp.rabbit.support.CorrelationData;
5 import org.springframework.stereotype.Component;
6
7 /**
8 * @ClassName ConfirmCallBackListener
9 * @Deccription TODO
10 * @Author DZ
11 * @Date 2019/5/4 22:26
12 * */
13 @Component("confirmCallBackListener")
14 public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {
15
16 @Override
17 public void confirm(CorrelationData correlationData, boolean ack, String cause) {
18 System.out.println("确认回调 ack==" + ack + "回调原因==" + cause);
19 }
20 }
1 package com.edu.spring2;
2
3 import com.rabbitmq.client.Channel;
4 import org.springframework.amqp.core.Message;
5 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
6 import org.springframework.stereotype.Component;
7
8 /**
9 * @ClassName ReceiveConfirmTestListener
10 * @Deccription TODO
11 * @Author DZ
12 * @Date 2019/5/4 22:24
13 * */
14 @Component("receiveConfirmTestListener")
15 public class ReceiveConfirmTestListener implements ChannelAwareMessageListener {
16 /**
17 * 收到消息时,执行的监听
18 *
19 * @param message
20 * @param channel
21 * @throws Exception
22 */
23 @Override
24 public void onMessage(Message message, Channel channel) throws Exception {
25 System.out.println(("消费者收到了消息" + message));
26 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false );
27 }
28 }
1 package com.edu.spring2;
2
3 import org.springframework.amqp.core.Message;
4 import org.springframework.amqp.rabbit.core.RabbitTemplate;
5 import org.springframework.stereotype.Component;
6
7 /**
8 * @ClassName ReturnCallBackListener
9 * @Deccription TODO
10 * @Author DZ
11 * @Date 2019/5/4 22:28
12 * */
13 @Component("returnCallBackListener")
14 public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {
15 @Override
16 public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
17 System.out.println("失败回调" + message);
18 }
19 }
回调函数的配置来自 XML
创建发送消息的工具类
1 package com.edu.spring2;
2
3 import org.springframework.amqp.core.AmqpTemplate;
4 import org.springframework.beans.factory.annotation.Autowired;
5 import org.springframework.stereotype.Component;
6
7 /**
8 * @ClassName PublicUtil
9 * @Deccription TODO
10 * @Author DZ
11 * @Date 2019/5/4 22:30
12 * */
13 @Component("publicUtil")
14 public class PublicUtil {
15 @Autowired
16 private AmqpTemplate amqpTemplate;
17
18 public void send(String excange, String routingkey, Object message) {
19 amqpTemplate.convertAndSend(excange, routingkey, message);
20 }
21 }
创建测试类
1 package com.edu.spring2;
2
3 import org.junit.Test;
4 import org.junit.runner.RunWith;
5 import org.springframework.beans.factory.annotation.Autowired;
6 import org.springframework.test.context.ContextConfiguration;
7 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
8
9 /**
10 * @ClassName TestMain
11 * @Deccription TODO
12 * @Author DZ
13 * @Date 2019/5/4 22:32
14 * */
15 @RunWith(SpringJUnit4ClassRunner.class )
16 @ContextConfiguration(locations = {"classpath:applicationContext2.xml"})
17 public class TestMain {
18 @Autowired
19 private PublicUtil publicUtil;
20 private static String exChange = "DIRECT_EX";// 交换机
21 private static String queue = "myQueue";
22
23 /**
24 * exChange和queue均正确
25 * confirm会执行,ack = ture
26 * 消息正常接收(接收消息确认方法正常执行)
27 */
28 @Test
29 public void test1() throws Exception {
30 publicUtil.send(exChange, queue, "测试1,队列和交换机均正确");
31 }
32 /**
33 * exChange错误,queue正确
34 * confirm执行,ack=false
35 * 消息无法接收(接收消息确认方法不能执行)
36 */
37 @Test
38 public void test2() throws Exception {
39 publicUtil.send(exChange + "1", queue, "测试2,队列正确,交换机错误");
40 }
41 /**
42 * exChange正常,queue错误
43 * return执行
44 * confirm执行,ack=ture
45 */
46 @Test
47 public void test3() throws Exception {
48 publicUtil.send(exChange, queue + "1", "测试2,队列错误,交换机正确");
49 }
50 /**
51 * exChange错误,queue错误
52 * confirm执行,ack=false
53 */
54 @Test
55 public void test4() throws Exception {
56 publicUtil.send(exChange + "1", queue + "1", "测试2,队列错误,交换机错误");
57 }
58 }
测试结果如下:
test1:exChange和queue均正确
confirm会执行,ack=ture;能正常收到消息(接收消息的方法正常执行)
test2:exChange错误,queue正确
confirm执行,ack=false;不能正常接收到消息
confirm执行,ack=ture;return执行;不能接收到消息
confirm执行,ack=false;不能接收消息
上述结论及代码如下图:
根据上述的测试结果,我们可以根据回调函数的返回结果,查看MQ的错误出现在那里。根据上述结论,我们可以对3个回调函数做如下处理:
类 ReceiveConfirmTestListener 中的onMessage方法主要用于接收从 RabbitMQ 推送过来的消息,并对消息做相应的逻辑处理
类 ConfirmCallBackListener 中的 confirm 方法主要用于检查交换机(exChange),当 ack=false,交换机可能错误
类 ReturnCallBackListener 中的 returnedMessage 方法用于检查队列(queue),当此方法执行时,队列可能错误
所以3个相应的方法可以做如下调整:
实际上,在真实项目中,上面3个方法也是按照这3个逻辑进行设计的。当然这3个方法中还可以加入更多的日志消息,和逻辑处理业务。
6.参考
https://blog.csdn.net/liu911025/article/details/80460182
https://blog.csdn.net/lyhkmm/article/details/78775369
https://blog.csdn.net/vbirdbest/article/details/78670550
https://blog.csdn.net/vbirdbest/article/details/78670550
https://www.rabbitmq.com/getstarted.html