RocketMQ(2)
2018-06-18 02:22:38来源:未知 阅读 ()
1. 消费端集群消费(负载均衡)
示例代码:
/** * Producer,发送消息 * */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("message_producer"); producer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876"); producer.start(); for (int i = 0; i < 100; i++) { try { Message msg = new Message("TopicTest",// topic "Tag1",// tag ("Hello RocketMQ " + i).getBytes()// body ); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } } /** * Consumer,订阅消息 */ public class Consumer1 { public Consumer1() { try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer"); consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3"); consumer.registerMessageListener(new Listener()); consumer.start(); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } } class Listener implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { String topic = msg.getTopic(); String msgBody = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags); System.out.println("======暂停====="); Thread.sleep(60000); } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } public static void main(String[] args) throws InterruptedException, MQClientException { Consumer1 consumer1 = new Consumer1(); System.out.println("Consumer1 Started."); } } /** * Consumer,订阅消息 */ public class Consumer2 { public Consumer2() { try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer"); consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876"); consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3"); consumer.registerMessageListener(new Listener()); consumer.start(); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } } class Listener implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { String topic = msg.getTopic(); String msgBody = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } public static void main(String[] args) throws InterruptedException, MQClientException { Consumer2 consumer2 = new Consumer2(); System.out.println("Consumer2 Started."); } }
一个生产者,两个消费者,注意两个消费者的组名要一样。
先启动两个消费者(customer1,customer2),通过控制台查看如下:
再启动生产者生成100条消息,消费情况如下:
生成的100条消息被customer1和customer2平均的消费了。可以通过consumer.setAllocateMessageQueueStrategy去设置分配策略。
BTW:这是默认的模式,可以通过consumer.setMessageModel设置,MessageModel.CLUSTERING | MessageModel.BROADCASTING,如果是广播消费,则每个客户端都会收到生产端的所有消息
2.消息未响应会重发
代码示例:
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("message_producer"); producer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876"); producer.start(); for (int i = 0; i < 1; i++) { try { Message msg = new Message("TopicTest",// topic "Tag1",// tag ("Hello RocketMQ " + i).getBytes()// body ); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } } public class Consumer1 { public Consumer1() { try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer"); consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876"); consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3"); consumer.registerMessageListener(new Listener()); consumer.start(); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } } class Listener implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { String topic = msg.getTopic(); String msgBody = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags); System.out.println("======暂停====="); Thread.sleep(600000); } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } public static void main(String[] args) throws InterruptedException, MQClientException { Consumer1 consumer1 = new Consumer1(); System.out.println("Consumer1 Started."); } } public class Consumer2 { public Consumer2() { try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer"); consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876"); consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3"); consumer.registerMessageListener(new Listener()); consumer.start(); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } } class Listener implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { String topic = msg.getTopic(); String msgBody = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } public static void main(String[] args) throws InterruptedException, MQClientException { Consumer2 consumer2 = new Consumer2(); System.out.println("Consumer2 Started."); } }
先启动consumer1,再启动consumer2,最后启动producer
consumer1收到了消息,consumer2没有收到消息,这时把consumer1强制停止,也就是说consumer1不会给MQ返回响应,查看结果:
consumer2也收到消息了,说明在MQ没收到消费端响应的情况下,会重发消息。
3. 修改topic的队列数
默认的队列数是4个,可以从执行结果中看出:queueId都是0-3
细节可以看https://www.cnblogs.com/dyfh/p/4113677.html
可以增加设置producer.createTopic("TopicTest", "TopicTest", 8);
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
- 数据源管理 | Kafka集群环境搭建,消息存储机制详解 2020-06-11
- Dubbo+Zookeeper集群案例 2020-06-09
- RocketMQ4.4 入门进阶+实战 2020-06-08
- 【从单体架构到分布式架构】(二)请求增多,单点变集群(1) 2020-06-07
- 多线程:生产者消费者(管程法、信号灯法) 2020-06-01
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash