SpringBoot集成rabbitmq(二)
2019-02-25 16:10:01来源:博客园 阅读 ()
前言
在使用rabbitmq时,我们可以通过消息持久化来解决服务器因异常崩溃而造成的消息丢失。除此之外,我们还会遇到一个问题,当消息生产者发消息发送出去后,消息到底有没有正确到达服务器呢?如果不进行特殊配置,默认情况下发送的消息是不会给生产者返回任何响应的,也就是默认情况下生产者并不知道消息是否正常到达了服务器。对于数据必达的需求,你肯定对消息的来龙去脉都有个了接,这种情况下就需要用到rabbitmq消息确认。
消息确认
rabbitmq消息确认分为生产者确认和消费者确认。
生产者消费确认提供了两种机制:
- 通过事务机制实现
- 通过confirm机制实现
事务机制则用到channel.txSelect、channel.txCommit、channel.txRollback。可以参考下面AMQP协议流转过程(参考Rabbitmq实战指南)
事务机制在一条消息发送之后会阻塞发送端,以等待rabbitmq回应,之后才继续发送下一条消息。所以相对来说事务机制的性能要差一些。事务机制会降低rabbitmq的吞吐量,所以又引入了另一种轻量级的方式:confirm机制。
生产者通过调用channel.confirmSelect将信道设置为confirm模式,之后Rabbitmq会返回Confirm.Select-Ok命令表示同意生产者将当前信道设置为confirm模式。所有被发送的后续消息都被ack或nack一次。类似如下代码:
channel.confirmSelect()
channel.basicPublish("exchange","routingkey",null,"test".getBytes())
confirm机制流转过程参考下图(参考Rabbitmq实战指南)
消费者确认
消费者在订阅消息队列时指定autoAck参数。当参数设置为false时rabbitmq会等待消费者显式回复确认信号才会从内存或者磁盘种删除这条消息。参数默认为true。当autoAck设置为false时,对于rabbitmq服务器而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息、一部分是已经投递给消费者的消息但是还没有收到确认信号的消息。可通过RabbitMQ Web平台查看队列中Ready和UnAck对应的数量。
消费者消息确认涉及到3个方法:channel.basicAck、channel.basicNack、channel.basicReject
SpringBoot集成rabbitmq下实现消息确认
springboot集成rabbitmq实现消息确认主要涉及两个回调方法(ReturnCallback、ConfirmCallback)。这里消费者部分我用两种方式来实现。一种是基于SimpleMessageListenerContainer。 另一种就是用RabbitListener注解实现。
1、application.yml
spring: rabbitmq: host: 192.168.80.128 port: 5672 username: admin password: admin virtual-host: / publisher-confirms: true publisher-returns: true listener: simple: acknowledge-mode: manual concurrency: 1 max-concurrency: 10 retry: enabled: true
2、配置文件(这里实现ReturnCallback、ConfirmCallback)
import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.lang.Nullable; @Configuration public class MqConfig { private Logger logger= LoggerFactory.getLogger(MqConfig.class); @Autowired RabbitTemplate rabbitTemplate; @Autowired ConnectionFactory connectionFactory; @Bean public Queue queue(){ return new Queue("testMq",true); //持久化队列(默认值也是true) } @Bean public DirectExchange directExchange(){ return new DirectExchange("testMq",true,false); } @Bean Binding binding(Queue queue,DirectExchange directExchange){ return BindingBuilder.bind(queue).to(directExchange).with("testMq"); } /** * i->replyCode * s->replyText * s1->exchange * s2->routingKey * **/ //消息从交换器发送到队列失败时触发 RabbitTemplate.ReturnCallback msgReturnCallback=new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { logger.info("消息:{},错误码:{},失败原因:{},交换器:{},路由key:{}",message.getMessageProperties().getCorrelationId(),i,s,s1,s2); } }; //消息发送到交换器时触发 RabbitTemplate.ConfirmCallback msgConfirmCallback=new RabbitTemplate.ConfirmCallback() { @Override public void confirm(@Nullable CorrelationData correlationData, boolean b, @Nullable String s) { if(b){ logger.info("消息{}发送exchange成功",correlationData.getId()); }else{ logger.info("消息发送到exchange失败,原因:{}",s); } } }; /*** * 消费者确认(方式二) * **/ @Bean public SimpleMessageListenerContainer listenerContainer(){ SimpleMessageListenerContainer container=new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames("testMq"); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(10); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { try{ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); logger.info("接收消息:{}",new String(message.getBody())); }catch (Exception ex){ //channel.basicReject //channel.basicNack } } }); return container; } /** * 生产者的回调都在这里 * **/ @Autowired public RabbitTemplate rabbitTemplate(){ //消息发送失败后返回到队列中 rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(msgReturnCallback); rabbitTemplate.setConfirmCallback(msgConfirmCallback); return rabbitTemplate; } }
另一种消费端实现方式
import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class MqConsumer { private Logger logger= LoggerFactory.getLogger(MqConsumer.class); @RabbitListener(queues = "testMq") public void handler(Message message,Channel channel){ try { channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); logger.info("接收消息:{}",new String(message.getBody())); } catch (IOException e) { e.printStackTrace(); } } }
3、消息生产者
消息发送时注意生成一个消息id。一开始没用到这个参数,在消息接收时消费者会抛空指针异常
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import java.util.UUID; @Controller @RequestMapping("/rabbitMq") public class MqController { private Logger logger= LoggerFactory.getLogger(MqController.class); @Autowired RabbitTemplate rabbitTemplate; @RequestMapping("/sendMq") @ResponseBody public String sendMq(){ /** * 这里exchange、routingkey都叫testMq * **/ Object message=null; for(int i=0;i<10;i++){ logger.info("生产者:第{}条消息",i); CorrelationData correlationId=new CorrelationData(UUID.randomUUID().toString()); message="第"+i+"条消息"; rabbitTemplate.convertAndSend("testMq","testMq",message,correlationId); } return "sending..."; } }
从运行截图中可以看到生产者和消费者都收到对应的回调消息。
原文链接:https://www.cnblogs.com/sword-successful/p/10418288.html
如有疑问请与原作者联系
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
- springboot2配置JavaMelody与springMVC配置JavaMelody 2020-06-11
- SpringBoot 2.3 整合最新版 ShardingJdbc + Druid + MyBatis 2020-06-11
- 掌握SpringBoot-2.3的容器探针:实战篇 2020-06-11
- nacos~配置中心功能~springboot的支持 2020-06-10
- SpringBoot + Vue + ElementUI 实现后台管理系统模板 -- 后 2020-06-10
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash