SpringBoot+RabbitMQ学习笔记(三)使用RabbitMQ…
2020-04-28 16:08:00来源:博客园 阅读 ()
SpringBoot+RabbitMQ学习笔记(三)使用RabbitMQ的三种交换器之Topic
一丶简介
Topic Exchange
将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。
业务场景:
1.日志服务器记录用户服务、商品服务、订单服务三个服务。
2.日志服务器有三个日志服务:INFO日志处理服务、ERROR日志处理服务、全日志处理服务。
3.使用Topic交换器处理日志,匹配规则依次为:*.log.info、*.log.error和*.log.*。
二丶配置文件
还是创建两个项目,一个作为生产者一个作为消费者。
生产者配置:
server.port=8883 spring.application.name=hello-world spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.thymeleaf.cache=false 设置交换器名称 mq.config.exchange=log.topicView Code
消费者配置:
server.port=8884 spring.application.name=lesson1 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #设置交换器名称 mq.config.exchange=log.topic #info队列名称 mq.config.queue.info=log.info #error队列名称 mq.config.queue.error=log.error #log队列名称 mq.config.queue.logs=log.allView Code
三丶创建生产者
1.订单服务
package com.example.amqptopicprovider; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:模拟订单服务发送消息 */ @Component public class OrderSender { @Autowired private AmqpTemplate amqpTemplate; //exChange 交换器 @Value("${mq.config.exchange}") private String exChange; /** * 发送消息的方法 * @param msg */ public void send(String msg){ //向消息队列发送消息 //参数1:队列名称 //参数2:消息 // this.amqpTemplate.convertAndSend("hello-queue",msg); //向消息队列发送消息 //参数1:交换器名称 //参数2:路由键 //参数3:消息 this.amqpTemplate.convertAndSend(exChange,"order.log.debug","order.log.debug-"+msg); this.amqpTemplate.convertAndSend(exChange,"order.log.info","order.log.info-"+msg); this.amqpTemplate.convertAndSend(exChange,"order.log.warn","order.log.warn-"+msg); this.amqpTemplate.convertAndSend(exChange,"order.log.error","order.log.error-"+msg); } }View Code
2.商品服务
package com.example.amqptopicprovider; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:模拟商品服务发送消息 */ @Component public class ProductSender { @Autowired private AmqpTemplate amqpTemplate; //exChange 交换器 @Value("${mq.config.exchange}") private String exChange; /** * 发送消息的方法 * @param msg */ public void send(String msg){ //向消息队列发送消息 //参数1:队列名称 //参数2:消息 // this.amqpTemplate.convertAndSend("hello-queue",msg); //向消息队列发送消息 //参数1:交换器名称 //参数2:路由键 //参数3:消息 this.amqpTemplate.convertAndSend(exChange,"product.log.debug","product.log.debug-"+msg); this.amqpTemplate.convertAndSend(exChange,"product.log.info","product.log.info-"+msg); this.amqpTemplate.convertAndSend(exChange,"product.log.warn","product.log.warn-"+msg); this.amqpTemplate.convertAndSend(exChange,"product.log.error","product.log.error-"+msg); } }View Code
3.用户服务
package com.example.amqptopicprovider; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:模拟用户服务发送消息 */ @Component public class UserSender { @Autowired private AmqpTemplate amqpTemplate; //exChange 交换器 @Value("${mq.config.exchange}") private String exChange; /** * 发送消息的方法 * @param msg */ public void send(String msg){ //向消息队列发送消息 //参数1:队列名称 //参数2:消息 // this.amqpTemplate.convertAndSend("hello-queue",msg); //向消息队列发送消息 //参数1:交换器名称 //参数2:路由键 //参数3:消息 this.amqpTemplate.convertAndSend(exChange,"user.log.debug","user.log.debug-"+msg); this.amqpTemplate.convertAndSend(exChange,"user.log.info","user.log.info-"+msg); this.amqpTemplate.convertAndSend(exChange,"user.log.warn","user.log.warn-"+msg); this.amqpTemplate.convertAndSend(exChange,"user.log.error","user.log.error-"+msg); } }View Code
四丶创建消费者
1.ERROR日志处理服务
package com.ant.amqptopicconsumer; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:消息接收者 * @RabbitListener bindings:绑定队列 * @QueueBinding value:绑定队列的名称 * exchange:配置交换器 * @Queue : value:配置队列名称 * autoDelete:是否是一个可删除的临时队列 * @Exchange value:为交换器起个名称 * type:指定具体的交换器类型 */ @Component @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${mq.config.queue.error}",autoDelete = "true"), exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC), key = "*.log.error" ) ) public class TopicErrorReceiver { /** * 接收消息的方法,采用消息队列监听机制 * @param msg */ @RabbitHandler public void process(String msg){ System.out.println("error-receiver:"+msg); } }View Code
2.INFO日志处理服务
package com.ant.amqptopicconsumer; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:消息接收者 * @RabbitListener bindings:绑定队列 * @QueueBinding value:绑定队列的名称 * exchange:配置交换器 * @Queue : value:配置队列名称 * autoDelete:是否是一个可删除的临时队列 * @Exchange value:为交换器起个名称 * type:指定具体的交换器类型 */ @Component @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${mq.config.queue.info}",autoDelete = "true"), exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC), key = "*.log.info" ) ) public class TopicInfoReceiver { /** * 接收消息的方法,采用消息队列监听机制 * @param msg */ @RabbitHandler public void process(String msg){ System.out.println("info-receiver:"+msg); } }View Code
3.全日志处理服务
package com.ant.amqptopicconsumer; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:消息接收者 * * @RabbitListener bindings:绑定队列 * @QueueBinding value:绑定队列的名称 * exchange:配置交换器 * key:路由键 * @Queue : value:配置队列名称 * autoDelete:是否是一个可删除的临时队列 * @Exchange value:为交换器起个名称 * type:指定具体的交换器类型 */ @Component @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${mq.config.queue.logs}",autoDelete = "true"), exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC), key = "*.log.*" ) ) public class TopicLogReceiver { /** * 接收消息的方法,采用消息队列监听机制 * @param msg */ @RabbitHandler public void process(String msg){ System.out.println("all-receiver:"+msg); } }View Code
五丶老规矩测试一发
package com.example.amqp; import com.example.ampq.Sender; import com.example.amqptopicprovider.OrderSender; import com.example.amqptopicprovider.ProductSender; import com.example.amqptopicprovider.UserSender; import com.example.helloworld.HelloworldApplication; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; /** * Author:aijiaxiang * Date:2020/4/26 * Description: */ @RunWith(SpringRunner.class) @SpringBootTest(classes = HelloworldApplication.class) public class QueueTest { @Autowired private Sender sender; @Autowired private UserSender userSender; @Autowired private ProductSender productSender; @Autowired private OrderSender orderSender; /** * 测试消息队列 */ // @Test // public void test1() throws InterruptedException { // while (true){ // Thread.sleep(1000); // sender.send("hello"); // } // // } @Test public void test2(){ userSender.send("usersend"); productSender.send("productsend"); orderSender.send("ordersend"); } }View Code
注:日志服务处理类中的路由键是直接采用了硬编码的方式进行配置,主要是为了方便查看一目了然,但是还是推荐将路由键配置在配置文件中,使用 "${}" 这个方式来进行读取。
原文链接:https://www.cnblogs.com/aijiaxiang/p/12797512.html
如有疑问请与原作者联系
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
- 学习Java 8 Stream Api (4) - Stream 终端操作之 collect 2020-06-11
- java学习之第一天 2020-06-11
- Java学习之第二天 2020-06-11
- Spring WebFlux 学习笔记 - (一) 前传:学习Java 8 Stream Ap 2020-06-11
- Java笔记:集合 2020-06-10
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash