java基础(六):RabbitMQ 入门

2019-05-08 07:30:05来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

 建议先了解为什么项目要使用 MQ 消息队列,MQ 消息队列有什么优点,如果在业务逻辑上没有此种需求,建议不要使用中间件。中间件对系统的性能做优化的同时,同时增加了系统的复杂性也维护难易度;其次,需要了解各种常见的 MQ 消息队列有什么区别,以便在相同的成本下选择一种最合适本系统的技术。

本文主要讨论 RabbitMQ,从3月底接触一个项目使用了 RabbitMQ,就开始着手学习,主要通过视频和博客学习了一个月,基本明白了 RabbitMQ 的应用,其它的 MQ 队列还不清楚,其底层技术还有待学习,以下是我目前的学习心得。

1.安装 Erlang

RabbitMQ 是基于 Erlang 语言写的,所以首先安装 Erlang,本例是在 Windows 上安装,也可以选择在 Linux 上安装,机器上没有虚拟机,直接在 Windows 上操作,建议在 Linux 上安装。官方下载 Erlang 软件,我下载最新版本 21.3。安装过程很简单,直接 Next 到底。 Linux 安装自行谷歌。如下图:



安装结束后,设置环境变量,如下图
 
测试是否安装成功

2.安装 RabbitMQ

在官方下载,选择最新版本 3.7。安装过程很简单,直接 Next 到底。如下图:


测试安装是否成功,进入安装目录 sbin,执行 rabbitmq-plugins enable rabbitmq_management 命令,出现下面界面,证明安装成功(建议以管理员方式打开 dos)。
 

执行 rabbitmq-server start 命令,启动服务。本地登陆并创建用户,如下图:

关于tags标签的解释:

1、  超级管理员(administrator)

可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

2、  监控者(monitoring)

可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

3、  策略制定者(policymaker)

可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

4、  普通管理者(management)

仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

5、  其他

无法登陆管理控制台,通常就是普通的生产者和消费者。

4.JAVA 操作RabbitMQ

参考 RabbitMQ 官网,一共分为6个模式

RabbitMQ 是一个消息代理,实际上,它接收生产者产生的消息,然后将消息传递给消费者。在这个过程中,它可以路由、缓冲、持久化等,在传输过程中,主要又三部分组成。

生产者:发送消息的一端

队列:它活动在 RabbitMQ 服务器中,消息存储的地方,队列本质上是一个缓冲对象,所以存储的消息不受限制
消费者:消息接收端
一般情况下,消息生产者、消费者和队列不在同一台服务器上,本地做测试,放在一台服务器上。 测试项目直接创建一个 maven 格式的项目,没必要创建网络格式。新建一个项目,如下图:首先准备操作 MQ 的环境

(1): 准备必要的 Pom 文件,导入相应的 jar 包,

  1 <?xml version="1.0" encoding="UTF-8"?>
  2 
  3 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5   <modelVersion>4.0.0</modelVersion>
  6 
  7   <groupId>com.edu</groupId>
  8   <artifactId>rabbitmqdemo</artifactId>
  9   <version>1.0</version>
 10 
 11   <name>rabbitmqdemo</name>
 12   <!-- FIXME change it to the project's website -->
 13   <url>http://www.example.com</url>
 14 
 15   <properties>
 16     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 17     <maven.compiler.source>1.7</maven.compiler.source>
 18     <maven.compiler.target>1.7</maven.compiler.target>
 19   </properties>
 20 
 21   <dependencies>
 22     <!--测试包-->
 23     <dependency>
 24       <groupId>junit</groupId>
 25       <artifactId>junit</artifactId>
 26       <version>4.11</version>
 27       <scope>test</scope>
 28     </dependency>
 29     <!--mq客户端-->
 30     <dependency>
 31       <groupId>com.rabbitmq</groupId>
 32       <artifactId>amqp-client</artifactId>
 33       <version>4.5.0</version>
 34     </dependency>
 35     <!--日志-->
 36     <dependency>
 37       <groupId>org.slf4j</groupId>
 38       <artifactId>slf4j-log4j12</artifactId>
 39       <version>1.7.25</version>
 40     </dependency>
 41     <!--工具包-->
 42     <dependency>
 43       <groupId>org.apache.commons</groupId>
 44       <artifactId>commons-lang3</artifactId>
 45       <version>3.3.2</version>
 46     </dependency>
 47     <!--spring集成-->
 48     <dependency>
 49       <groupId>org.springframework.amqp</groupId>
 50       <artifactId>spring-rabbit</artifactId>
 51       <version>1.7.6.RELEASE</version>
 52     </dependency>
 53     <dependency>
 54       <groupId>org.springframework</groupId>
 55       <artifactId>spring-test</artifactId>
 56       <version>4.3.7.RELEASE</version>
 57     </dependency>
 58       <dependency>
 59           <groupId>junit</groupId>
 60           <artifactId>junit</artifactId>
 61           <version>RELEASE</version>
 62           <scope>compile</scope>
 63       </dependency>
 64   </dependencies>
 65 
 66   <build>
 67     <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
 68       <plugins>
 69         <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
 70         <plugin>
 71           <artifactId>maven-clean-plugin</artifactId>
 72           <version>3.1.0</version>
 73         </plugin>
 74         <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
 75         <plugin>
 76           <artifactId>maven-resources-plugin</artifactId>
 77           <version>3.0.2</version>
 78         </plugin>
 79         <plugin>
 80           <artifactId>maven-compiler-plugin</artifactId>
 81           <version>3.8.0</version>
 82         </plugin>
 83         <plugin>
 84           <artifactId>maven-surefire-plugin</artifactId>
 85           <version>2.22.1</version>
 86         </plugin>
 87         <plugin>
 88           <artifactId>maven-jar-plugin</artifactId>
 89           <version>3.0.2</version>
 90         </plugin>
 91         <plugin>
 92           <artifactId>maven-install-plugin</artifactId>
 93           <version>2.5.2</version>
 94         </plugin>
 95         <plugin>
 96           <artifactId>maven-deploy-plugin</artifactId>
 97           <version>2.8.2</version>
 98         </plugin>
 99         <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
100         <plugin>
101           <artifactId>maven-site-plugin</artifactId>
102           <version>3.7.1</version>
103         </plugin>
104         <plugin>
105           <artifactId>maven-project-info-reports-plugin</artifactId>
106           <version>3.0.0</version>
107         </plugin>
108       </plugins>
109     </pluginManagement>
110   </build>
111 </project>

 

(2): 建立日志配置文件,在 resources 下建立 log4j.properties,便于打印精确的日志信息

1 log4j.rootLogger=DEBUG,A1
2 log4j.logger.com.edu=DEBUG
3 log4j.logger.org.mybatis=DEBUG
4 log4j.appender.A1=org.apache.log4j.ConsoleAppender
5 log4j.appender.A1.layout=org.apache.log4j.PatternLayout
6 log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-%m%n

(3): 编写一个工具类,主要用于连接 RabbitMQ

 1 package com.edu.util;
 2 
 3 
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.ConnectionFactory;
 6 
 7 /**
 8  * @ClassName ConnectionUtil
 9  * @Deccription 穿件连接的工具类
10  * @Author DZ
11  * @Date 2019/5/4 12:27
12  **/
13 public class ConnectionUtil {
14     /**
15      * 创建连接工具
16      *
17      * @return
18      * @throws Exception
19      */
20     public static Connection getConnection() throws Exception {
21         ConnectionFactory connectionFactory = new ConnectionFactory();
22         connectionFactory.setHost("127.0.0.1");//MQ的服务器
23         connectionFactory.setPort(5672);//默认端口号
24         connectionFactory.setUsername("test");
25         connectionFactory.setPassword("test");
26         connectionFactory.setVirtualHost("/test");
27         return connectionFactory.newConnection();
28     }
29 }

项目总体图如下:

4.1.Hello World模式

此模式非常简单,一个生产者对应一个消费者

首先我们制造一个消息生产者,并发送消息:
 1 package com.edu.hello;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 
 7 /**
 8  * @ClassName Sender
 9  * @Deccription 创建发送者
10  * @Author DZ
11  * @Date 2019/5/4 12:45
12  **/
13 public class Sender {
14     private final static String QUEUE = "testhello"; //队列的名字
15 
16     public static void main(String[] srgs) throws Exception {
17         //获取连接
18         Connection connection = ConnectionUtil.getConnection();
19         //创建连接
20         Channel channel = connection.createChannel();
21         //声明队列
22         //参数1:队列的名字
23         //参数2:是否持久化队列,我们的队列存在内存中,如果mq重启则丢失。如果为ture,则保存在erlang的数据库中,重启,依旧保存
24         //参数3:是否排外,我们连接关闭后是否自动删除队列,是否私有当前队列,如果私有,其他队列不能访问
25         //参数4:是否自动删除
26         //参数5:我们传入的其他参数
27         channel.queueDeclare(QUEUE, false, false, false, null);
28         //发送内容
29         channel.basicPublish("", QUEUE, null, "要发送的消息".getBytes());
30         //关闭连接
31         channel.close();
32         connection.close();
33     }
34 }

定义一个消息接受者

 1 package com.edu.hello;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 import com.rabbitmq.client.QueueingConsumer;
 7 
 8 /**
 9  * @ClassName Recver
10  * @Deccription 消息接受者
11  * @Author DZ
12  * @Date 2019/5/4 12:58
13  **/
14 public class Recver {
15     private final static String QUEUE = "testhello";//消息队列的名称
16 
17     public static void main(String[] args) throws Exception {
18         Connection connection = ConnectionUtil.getConnection();
19         Channel channel = connection.createChannel();
20         channel.queueDeclare(QUEUE, false, false, false, null);
21         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
22         //接受消息,参数2表示自动确认消息
23         channel.basicConsume(QUEUE, true, queueingConsumer);
24         while (true) {
25             //获取消息
26             QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();//如果没有消息就等待,有消息就获取消息,并销毁,是一次性的
27             String message = new String(delivery.getBody());
28             System.out.println(message);
29         }
30     }
31 }

此种模式属于“点对点”模式,一个生产者、一个队列、一个消费者,可以运用在聊天室(实际上真实的聊天室比这复杂很多,虽然是“点对点”模式,但是并不是一个生产者,一个队列,一个消费者)

4.2.work queues

一个生产者对应多个消费者,但是只有一个消费者获得消息

定义消息制造者:

 1 package com.edu.work;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 
 7 /**
 8  * @ClassName Sender
 9  * @Deccription 创建发送者
10  * @Author DZ
11  * @Date 2019/5/4 12:45
12  **/
13 public class Sender {
14     private final static String QUEUE = "testhellowork"; //队列的名字
15 
16     public static void main(String[] srgs) throws Exception {
17         //获取连接
18         Connection connection = ConnectionUtil.getConnection();
19         //创建连接
20         Channel channel = connection.createChannel();
21         //声明队列
22         //参数1:队列的名字
23         //参数2:是否持久化队列,我们的队列存在内存中,如果mq重启则丢失。如果为ture,则保存在erlang的数据库中,重启,依旧保存
24         //参数3:是否排外,我们连接关闭后是否自动删除队列,是否私有当前队列,如果私有,其他队列不能访问
25         //参数4:是否自动删除
26         //参数5:我们传入的其他参数
27         channel.queueDeclare(QUEUE, false, false, false, null);
28         //发送内容
29         for (int i = 0; i < 100; i++) {
30             channel.basicPublish("", QUEUE, null, ("要发送的消息" + i).getBytes());
31         }
32         //关闭连接
33         channel.close();
34         connection.close();
35     }
36 }
 

定义2个消息消费者

 1 package com.edu.work;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 import java.util.Queue;
 8 
 9 /**
10  * @ClassName Recver1
11  * @Deccription 消息接受者
12  * @Author DZ
13  * @Date 2019/5/4 12:58
14  **/
15 public class Recver1 {
16     private final static String QUEUE = "testhellowork";//消息队列的名称
17 
18     public static void main(String[] args) throws Exception {
19         Connection connection = ConnectionUtil.getConnection();
20         final Channel channel = connection.createChannel();
21         channel.queueDeclare(QUEUE, false, false, false, null);
22         //channel.basicQos(1);//告诉服务器,当前消息没有确认之前,不要发送新消息,合理自动分配资源
23         DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
24             @Override
25             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
26                 //收到消息时候调用
27                 System.out.println("消费者1收到的消息:" + new String(body));
28                 /*super.handleDelivery(consumerTag, envelope, properties, body);*/
29                 //确认消息
30                 //参数2:false为确认收到消息,ture为拒绝收到消息
31                 channel.basicAck(envelope.getDeliveryTag(), false);
32             }
33         };
34         //注册消费者
35         // 参数2:手动确认,我们收到消息后,需要手动确认,告诉服务器,我们收到消息了
36         channel.basicConsume(QUEUE, false, defaultConsumer);
37     }
38 }
 1 package com.edu.work;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 /**
 9  * @ClassName Recver1
10  * @Deccription 消息接受者
11  * @Author DZ
12  * @Date 2019/5/4 12:58
13  **/
14 public class Recver2 {
15     private final static String QUEUE = "testhellowork";//消息队列的名称
16 
17     public static void main(String[] args) throws Exception {
18         Connection connection = ConnectionUtil.getConnection();
19         final Channel channel = connection.createChannel();
20         channel.queueDeclare(QUEUE, false, false, false, null);
21         //channel.basicQos(1);
22         DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
23             @Override
24             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
25                 //收到消息时候调用
26                 System.out.println("消费者2收到的消息:" + new String(body));
27                 /*super.handleDelivery(consumerTag, envelope, properties, body);*/
28                 //确认消息
29                 //参数2:false为确认收到消息,ture为拒绝收到消息
30                 channel.basicAck(envelope.getDeliveryTag(), false);
31             }
32         };
33         //注册消费者
34         // 参数2:手动确认,我们收到消息后,需要手动确认,告诉服务器,我们收到消息了
35         channel.basicConsume(QUEUE, false, defaultConsumer);
36     }
37 }

 

这种模式是最简单的 work 模式,消息发送者,循环发送了100次消息,打印结果如下:

可以看出,消息消费者消费到的消息是替换的,即一个消息只被消费了一次,且两个消费者各消费了50条消息。这里有个弊端,消息消费者发布消息的时候,无论消费者的消费能力如何(电脑的内存等硬件),消息只会均匀分布给各个消费者(可以给2个消费者 sleep 下,结果还是这样)。有没有什么方式可以让消息自动分配(按照电脑的硬件,能者多劳),答案是可以的,只需要增加 channel.basicQos(1);此方案可以用来进行负载均衡,抢红包等场景

4.3.public模式

一个消费者将消息首先发送到交换器,交换器绑定到多个队列,然后被监听该队列的消费者所接收并消费。X 表示交换器,在 RabbitMQ 中,交换器主要有四种类型: direct、fanout、topic、headers,这里的交换器是 fanout,其它类型的交换机自行谷歌,主要区别是交换机的匹配方式发生了变化。

定义消息发布者

 1 package com.edu.publish;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 
 7 /**
 8  * @ClassName Sender
 9  * @Deccription TODO
10  * @Author DZ
11  * @Date 2019/5/4 14:43
12  **/
13 public class Sender {
14     private final static String EXCHANGE_NAME = "testexchange";//定义交换机名字
15 
16     public static void main(String[] args) throws Exception {
17         Connection connection = ConnectionUtil.getConnection();
18         Channel channel = connection.createChannel();
19         //声明交换机
20         //定义一个交换机,类型为fanout,也就是发布订阅者模式
21         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
22         //发布订阅模式,因为消息是先发布到交换机中,而交换机是没有保存功能的,所以如果没有消费者,消息会丢失
23         channel.basicPublish(EXCHANGE_NAME, "", null, "发布订阅模式的消息".getBytes());
24         channel.close();
25         connection.close();
26     }
27 }
定义2个消息消费者
 1 package com.edu.publish;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 /**
 9  * @ClassName Recver1
10  * @Deccription TODO
11  * @Author DZ
12  * @Date 2019/5/4 14:49
13  **/
14 public class Recver1 {
15     //定义交换机
16     private final static String EXCHANGE_NAME = "testexchange";
17     private final static String QUEUE = "testpubqueue1";
18 
19     public static void main(String[] args) throws Exception {
20         Connection connection = ConnectionUtil.getConnection();
21         final Channel channel = connection.createChannel();
22         channel.queueDeclare(QUEUE, false, false, false, null);
23         //绑定队列到交换机
24         channel.queueBind(QUEUE, EXCHANGE_NAME, "");
25         channel.basicQos(1);
26         DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
27             @Override
28             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
29                 /* super.handleDelivery(consumerTag, envelope, properties, body);*/
30                 System.out.println("消费者1:" + new String(body));
31                 channel.basicAck(envelope.getDeliveryTag(), false);
32             }
33         };
34         channel.basicConsume(QUEUE, false, defaultConsumer);
35     }
36 }
 1 package com.edu.publish;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 /**
 9  * @ClassName Recver1
10  * @Deccription TODO
11  * @Author DZ
12  * @Date 2019/5/4 14:49
13  **/
14 public class Recver2 {
15     //定义交换机
16     private final static String EXCHANGE_NAME = "testexchange";
17     private final static String QUEUE = "testpubqueue2";
18 
19     public static void main(String[] args) throws Exception {
20         Connection connection = ConnectionUtil.getConnection();
21         final Channel channel = connection.createChannel();
22         channel.queueDeclare(QUEUE, false, false, false, null);
23         //绑定队列到交换机
24         channel.queueBind(QUEUE, EXCHANGE_NAME, "");
25         channel.basicQos(1);
26         DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
27             @Override
28             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
29                 /* super.handleDelivery(consumerTag, envelope, properties, body);*/
30                 System.out.println("消费者2:" + new String(body));
31                 channel.basicAck(envelope.getDeliveryTag(), false);
32             }
33         };
34         channel.basicConsume(QUEUE, false, defaultConsumer);
35     }
36 }

 

消费者1 和消费者2 都监听了被同一个交换器绑定的队列,因此消息被同时消费到了。如果消息发送到没有队列绑定的交换器时,消息将丢失,因为交换器没有存储消息的能力,消息只能存储在队列中。

应用场景:比如一个商城系统需要在管理员上传商品新的图片时,前台系统必须更新图片,日志系统必须记录相应的日志,那么就可以将两个队列绑定到图片上传交换器上,一个用于前台系统更新图片,另一个用于日志系统记录日志。

4.4.routing

" alt="" data-src="https://user-gold-cdn.xitu.io/2019/5/4/16a8283b10501856?imageView2/0/w/1280/h/960/format/webp/ignore-error/1" data-width="478" data-height="177" />生产者将消息发送到 direct 交换器,在绑定队列和交换器的时候有一个路由 key,生产者发送的消息会指定一个路由 key,那么消息只会发送到相应 key 相同的队列,接着监听该队列的消费者消费消息。

定义消息发布者

 1 package com.edu.route;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 
 7 /**
 8  * @ClassName Sender
 9  * @Deccription TODO
10  * @Author DZ
11  * @Date 2019/5/4 15:05
12  **/
13 public class Sender {
14     private final static String EXCANGE_NAME = "testroute";
15 
16     public static void main(String[] args) throws Exception {
17         Connection connection = ConnectionUtil.getConnection();
18         Channel channel = connection.createChannel();
19         //定义路由格式的交换机
20         channel.exchangeDeclare(EXCANGE_NAME, "direct");
21         channel.basicPublish(EXCANGE_NAME, "key2", null, "路由模式的消息".getBytes());
22         channel.close();
23         connection.close();
24     }
25 }
 1 package com.edu.route;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 /**
 9  * @ClassName Recver1
10  * @Deccription TODO
11  * @Author DZ
12  * @Date 2019/5/4 14:49
13  **/
14 public class Recver1 {
15     //定义交换机
16     private final static String EXCHANGE_NAME = "testroute";
17     private final static String QUEUE = "testroute1queue";
18 
19     public static void main(String[] args) throws Exception {
20         Connection connection = ConnectionUtil.getConnection();
21         final Channel channel = connection.createChannel();
22         channel.queueDeclare(QUEUE, false, false, false, null);
23         //绑定队列到交换机
24         //参数3:绑定到交换机指定的路由的名字
25         channel.queueBind(QUEUE, EXCHANGE_NAME, "key1");
26         //如果需要绑定多个路由,再绑定一次即可
27         channel.queueBind(QUEUE, EXCHANGE_NAME, "key2");
28         channel.basicQos(1);
29         DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
30             @Override
31             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
32                 /* super.handleDelivery(consumerTag, envelope, properties, body);*/
33                 System.out.println("消费者1:" + new String(body));
34                 channel.basicAck(envelope.getDeliveryTag(), false);
35             }
36         };
37         channel.basicConsume(QUEUE, false, defaultConsumer);
38     }
39 }
 1 package com.edu.route;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 /**
 9  * @ClassName Recver1
10  * @Deccription TODO
11  * @Author DZ
12  * @Date 2019/5/4 14:49
13  **/
14 public class Recver2 {
15     //定义交换机
16     private final static String EXCHANGE_NAME = "testroute";
17     private final static String QUEUE = "testroute2queue";
18 
19     public static void main(String[] args) throws Exception {
20         Connection connection = ConnectionUtil.getConnection();
21         final Channel channel = connection.createChannel();
22         channel.queueDeclare(QUEUE, false, false, false, null);
23         //绑定队列到交换机
24         //参数3:绑定到交换机指定的路由的名字
25         channel.queueBind(QUEUE, EXCHANGE_NAME, "key1");
26         //如果需要绑定多个路由,再绑定一次即可
27         channel.queueBind(QUEUE, EXCHANGE_NAME, "key3");
28         channel.basicQos(1);
29         DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
30             @Override
31             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
32                 /* super.handleDelivery(consumerTag, envelope, properties, body);*/
33                 System.out.println("消费者2:" + new String(body));
34                 channel.basicAck(envelope.getDeliveryTag(), false);
35             }
36         };
37         channel.basicConsume(QUEUE, false, defaultConsumer);
38     }
39 }

 

应用场景:利用消费者能够有选择性的接收消息的特性,比如我们商城系统的后台管理系统对于商品进行修改、删除、新增操作都需要更新前台系统的界面展示,而查询操作确不需要,那么这两个队列分开接收消息就比较好。

4.5.Topic

上面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配。符号 “#” 表示匹配一个或多个词,符号 “*” 表示匹配一个词。实际上 Topic 模式是 routing 模式的扩展
 1 package com.edu.topic;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 
 7 /**
 8  * @ClassName Sender
 9  * @Deccription TODO
10  * @Author DZ
11  * @Date 2019/5/4 15:19
12  **/
13 public class Sender {
14     private final static String EXCANGE_NAME = "testtopexchange";
15 
16     public static void main(String[] args) throws Exception {
17         Connection connection = ConnectionUtil.getConnection();
18         Channel channel = connection.createChannel();
19         channel.exchangeDeclare(EXCANGE_NAME, "topic");
20         channel.basicPublish(EXCANGE_NAME, "abc.adb.1", null, "topic模式消息发送者:".getBytes());
21         channel.close();
22         connection.close();
23     }
24 }
 1 package com.edu.topic;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 /**
 9  * @ClassName Recver1
10  * @Deccription TODO
11  * @Author DZ
12  * @Date 2019/5/4 14:49
13  **/
14 public class Recver1 {
15     //定义交换机
16     private final static String EXCHANGE_NAME = "testtopexchange";
17     private final static String QUEUE = "testtopic1queue";
18 
19     public static void main(String[] args) throws Exception {
20         Connection connection = ConnectionUtil.getConnection();
21         final Channel channel = connection.createChannel();
22         channel.queueDeclare(QUEUE, false, false, false, null);
23         //绑定队列到交换机
24         //参数3:绑定到交换机指定的路由的名字
25         channel.queueBind(QUEUE, EXCHANGE_NAME, "key.*");
26         //如果需要绑定多个路由,再绑定一次即可
27         channel.queueBind(QUEUE, EXCHANGE_NAME, "abc.*");
28         channel.basicQos(1);
29         DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
30             @Override
31             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
32                 /* super.handleDelivery(consumerTag, envelope, properties, body);*/
33                 System.out.println("消费者1:" + new String(body));
34                 channel.basicAck(envelope.getDeliveryTag(), false);
35             }
36         };
37         channel.basicConsume(QUEUE, false, defaultConsumer);
38     }
39 }
 1 package com.edu.topic;
 2 
 3 import com.edu.util.ConnectionUtil;
 4 import com.rabbitmq.client.*;
 5 
 6 import java.io.IOException;
 7 
 8 /**
 9  * @ClassName Recver1
10  * @Deccription TODO
11  * @Author DZ
12  * @Date 2019/5/4 14:49
13  **/
14 public class Recver2 {
15     //定义交换机
16     private final static String EXCHANGE_NAME = "testtopexchange";
17     private final static String QUEUE = "testtopic2queue";
18 
19     public static void main(String[] args) throws Exception {
20         Connection connection = ConnectionUtil.getConnection();
21         final Channel channel = connection.createChannel();
22         channel.queueDeclare(QUEUE, false, false, false, null);
23         //绑定队列到交换机
24         //参数3:绑定到交换机指定的路由的名字
25         channel.queueBind(QUEUE, EXCHANGE_NAME, "key.*");
26         //如果需要绑定多个路由,再绑定一次即可
27         channel.queueBind(QUEUE, EXCHANGE_NAME, "abc.#");
28         channel.basicQos(1);
29         DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
30             @Override
31             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
32                 /* super.handleDelivery(consumerTag, envelope, properties, body);*/
33                 System.out.println("消费者2:" + new String(body));
34                 channel.basicAck(envelope.getDeliveryTag(), false);
35             }
36         };
37         channel.basicConsume(QUEUE, false, defaultConsumer);
38     }
39 }

 

第六种模式是将上述的模式集成其它的框架,进行远程访问,这里我们将集成 Spring 实现 RCP 远程模式的使用

5.Spring 集成 RabbitMQ

5.1.自动集成 Spring

编写spring的配置,此配置文件的目的是将 Spring 与 RabbitMQ 进行整合,实际上就是将 MQ 的相关信息(连接,队列,交换机……)通过XML配置的方式实现

 1 <beans xmlns="http://www.springframework.org/schema/beans"
 2        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 4        xsi:schemaLocation="http://www.springframework.org/schema/rabbit
 5        http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd
 6        http://www.springframework.org/schema/beans
 7        http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">
 8     <!--定义连接工厂-->
 9     <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="test" password="test"
10                                virtual-host="/test"/>
11     <!--
12      定义模板
13      第三个参数,决定消息发送到哪里,如果为exchange,则发送到交换机;如果为queue,则发送到队列
14     -->
15     <rabbit:template id="template" connection-factory="connectionFactory" exchange="fanoutExchange"/>
16     <rabbit:admin connection-factory="connectionFactory"/>
17     <!--定义队列-->
18     <rabbit:queue name="myQueue" auto-declare="true"/>
19     <!--定义交换机-->
20     <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true">
21         <!--将消息绑定到交换机-->
22         <rabbit:bindings>
23             <rabbit:binding queue="myQueue">
24 
25             </rabbit:binding>
26         </rabbit:bindings>
27     </rabbit:fanout-exchange>
28     <!--定义监听器,收到消息会执行-->
29     <rabbit:listener-container connection-factory="connectionFactory">
30        <!-- 定义监听的类和方法-->
31         <rabbit:listener ref="consumer" method="test" queue-names="myQueue"/>
32     </rabbit:listener-container>
33     <!--定义消费者-->
34     <bean id="consumer" class="com.edu.spring.MyConsumer"/>
35 
36 </beans>

 

生产者:

 1 package com.edu.spring;
 2 
 3 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 4 import org.springframework.context.ApplicationContext;
 5 import org.springframework.context.support.ClassPathXmlApplicationContext;
 6 
 7 /**
 8  * @ClassName SpringTest
 9  * @Deccription TODO
10  * @Author DZ
11  * @Date 2019/5/4 18:40
12  **/
13 public class SpringTest {
14     public static void main(String[] args) throws Exception {
15         ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
16         RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
17         rabbitTemplate.convertAndSend("Spring的消息");
18         ((ClassPathXmlApplicationContext) applicationContext).destroy();
19     }
20 }

消费者

 1 package com.edu.spring;
 2 
 3 /**
 4  * @ClassName MyConsumer
 5  * @Deccription TODO
 6  * @Author DZ
 7  * @Date 2019/5/4 18:35
 8  **/
 9 public class MyConsumer {
10     /*用于接收消息*/
11     public void test(String message) {
12         System.err.println(message);
13     }
14 }
 

集成Spring主要是在xml中实现了队列和交换机的创建。

最好能理解上面的图。理解后,以后写相关的代码,直接去网上 copy 一份配置文件,然后根据自己项目的情况进行修改。如果不能理解,就不知道如何修改出现错误后不知道错误出现在什么地方。

5.2.手动模式

手动模式,主要增加MQ的回调操作,MQ消息失败或者成功就有相应的回调信息,增强系统的健壮性,一旦产生异常,很快就能定位到异常的位置,所以在实际开发中,一般都这种方式

创建xml配置文件

 1 <beans xmlns="http://www.springframework.org/schema/beans"
 2        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
 4        xmlns:context="http://www.springframework.org/schema/context"
 5        xsi:schemaLocation="http://www.springframework.org/schema/rabbit
 6        http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd
 7        http://www.springframework.org/schema/beans
 8        http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
 9        http://www.springframework.org/schema/context
10        http://www.springframework.org/schema/context/spring-context-4.3.xsd">
11     <context:component-scan base-package="com.edu.spring2"/>
12     <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
13 
14     <!--
15     定义连接工厂
16     publisher-confirms为ture,确认失败等回调才会执行
17     -->
18     <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="test" password="test"
19                                virtual-host="/test" publisher-confirms="true"/>
20 
21     <rabbit:admin connection-factory="connectionFactory"/>
22     <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" confirm-callback="confirmCallBackListener"
23                      return-callback="returnCallBackListener"
24                      mandatory="true"/>
25     <!--定义队列-->
26     <rabbit:queue name="myQueue" auto-declare="true"/>
27     <!--定义交换机-->
28     <rabbit:direct-exchange name="DIRECT_EX" id="DIRECT_EX">
29         <!--将消息绑定到交换机-->
30         <rabbit:bindings>
31             <rabbit:binding queue="myQueue">
32 
33             </rabbit:binding>
34         </rabbit:bindings>
35     </rabbit:direct-exchange>
36     <!--定义监听器,收到消息会执行-->
37     <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
38         <!-- 定义监听的类和方法-->
39         <rabbit:listener queues="myQueue" ref="receiveConfirmTestListener"/>
40     </rabbit:listener-container>
41 
42 </beans>
 

创建回调监听函数

 1 package com.edu.spring2;
 2 
 3 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 4 import org.springframework.amqp.rabbit.support.CorrelationData;
 5 import org.springframework.stereotype.Component;
 6 
 7 /**
 8  * @ClassName ConfirmCallBackListener
 9  * @Deccription TODO
10  * @Author DZ
11  * @Date 2019/5/4 22:26
12  **/
13 @Component("confirmCallBackListener")
14 public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {
15 
16     @Override
17     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
18         System.out.println("确认回调 ack==" + ack + "回调原因==" + cause);
19     }
20 }
 1 package com.edu.spring2;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import org.springframework.amqp.core.Message;
 5 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
 6 import org.springframework.stereotype.Component;
 7 
 8 /**
 9  * @ClassName ReceiveConfirmTestListener
10  * @Deccription TODO
11  * @Author DZ
12  * @Date 2019/5/4 22:24
13  **/
14 @Component("receiveConfirmTestListener")
15 public class ReceiveConfirmTestListener implements ChannelAwareMessageListener {
16     /**
17      * 收到消息时,执行的监听
18      *
19      * @param message
20      * @param channel
21      * @throws Exception
22      */
23     @Override
24     public void onMessage(Message message, Channel channel) throws Exception {
25         System.out.println(("消费者收到了消息" + message));
26         channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
27     }
28 }
 1 package com.edu.spring2;
 2 
 3 import org.springframework.amqp.core.Message;
 4 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 5 import org.springframework.stereotype.Component;
 6 
 7 /**
 8  * @ClassName ReturnCallBackListener
 9  * @Deccription TODO
10  * @Author DZ
11  * @Date 2019/5/4 22:28
12  **/
13 @Component("returnCallBackListener")
14 public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {
15     @Override
16     public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
17         System.out.println("失败回调" + message);
18     }
19 }

 

回调函数的配置来自 XML

创建发送消息的工具类
 1 package com.edu.spring2;
 2 
 3 import org.springframework.amqp.core.AmqpTemplate;
 4 import org.springframework.beans.factory.annotation.Autowired;
 5 import org.springframework.stereotype.Component;
 6 
 7 /**
 8  * @ClassName PublicUtil
 9  * @Deccription TODO
10  * @Author DZ
11  * @Date 2019/5/4 22:30
12  **/
13 @Component("publicUtil")
14 public class PublicUtil {
15     @Autowired
16     private AmqpTemplate amqpTemplate;
17 
18     public void send(String excange, String routingkey, Object message) {
19         amqpTemplate.convertAndSend(excange, routingkey, message);
20     }
21 }

 

 

创建测试类

 1 package com.edu.spring2;
 2 
 3 import org.junit.Test;
 4 import org.junit.runner.RunWith;
 5 import org.springframework.beans.factory.annotation.Autowired;
 6 import org.springframework.test.context.ContextConfiguration;
 7 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 8 
 9 /**
10  * @ClassName TestMain
11  * @Deccription TODO
12  * @Author DZ
13  * @Date 2019/5/4 22:32
14  **/
15 @RunWith(SpringJUnit4ClassRunner.class)
16 @ContextConfiguration(locations = {"classpath:applicationContext2.xml"})
17 public class TestMain {
18     @Autowired
19     private PublicUtil publicUtil;
20     private static String exChange = "DIRECT_EX";//交换机
21     private static String queue = "myQueue";
22 
23     /**
24      * exChange和queue均正确
25      * confirm会执行,ack = ture
26      * 消息正常接收(接收消息确认方法正常执行)
27      */
28     @Test
29     public void test1() throws Exception {
30         publicUtil.send(exChange, queue, "测试1,队列和交换机均正确");
31     }
32     /**
33      * exChange错误,queue正确
34      * confirm执行,ack=false
35      * 消息无法接收(接收消息确认方法不能执行)
36      */
37     @Test
38     public void test2() throws Exception {
39         publicUtil.send(exChange + "1", queue, "测试2,队列正确,交换机错误");
40     }
41     /**
42      * exChange正常,queue错误
43      * return执行
44      * confirm执行,ack=ture
45      */
46     @Test
47     public void test3() throws Exception {
48         publicUtil.send(exChange, queue + "1", "测试2,队列错误,交换机正确");
49     }
50     /**
51      * exChange错误,queue错误
52      * confirm执行,ack=false
53      */
54     @Test
55     public void test4() throws Exception {
56         publicUtil.send(exChange + "1", queue + "1", "测试2,队列错误,交换机错误");
57     }
58 }
 

测试结果如下:

  • test1:exChange和queue均正确

  • confirm会执行,ack=ture;能正常收到消息(接收消息的方法正常执行)
  • test2:exChange错误,queue正确

confirm执行,ack=false;不能正常接收到消息
  • test3:exChange正确,queue错误
confirm执行,ack=ture;return执行;不能接收到消息
  • test4:exChange和queue均错误
confirm执行,ack=false;不能接收消息

上述结论及代码如下图:

根据上述的测试结果,我们可以根据回调函数的返回结果,查看MQ的错误出现在那里。根据上述结论,我们可以对3个回调函数做如下处理:

  • 类 ReceiveConfirmTestListener 中的onMessage方法主要用于接收从 RabbitMQ 推送过来的消息,并对消息做相应的逻辑处理

  • 类 ConfirmCallBackListener 中的 confirm 方法主要用于检查交换机(exChange),当 ack=false,交换机可能错误

  • 类 ReturnCallBackListener 中的 returnedMessage 方法用于检查队列(queue),当此方法执行时,队列可能错误

所以3个相应的方法可以做如下调整:

实际上,在真实项目中,上面3个方法也是按照这3个逻辑进行设计的。当然这3个方法中还可以加入更多的日志消息,和逻辑处理业务。

6.参考

https://blog.csdn.net/liu911025/article/details/80460182

https://blog.csdn.net/lyhkmm/article/details/78775369

https://blog.csdn.net/vbirdbest/article/details/78670550

https://blog.csdn.net/vbirdbest/article/details/78670550

https://www.rabbitmq.com/getstarted.html


原文链接:https://www.cnblogs.com/dz-boss/p/10817100.html
如有疑问请与原作者联系

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:使用 ServerSocket 进行文件上传,以及用Tomcat启动ServerSocket

下一篇:Java对象创建