RocketMQ(1)--helloworld

2018-06-18 02:47:26来源:未知 阅读 ()

新老客户大回馈,云服务器低至5折

 

双Master方式:

服务器环境

序号 IP 角色 模式
1 192.168.32.135 nameServer1,brokerServer1  Master1
2 192.168.32.136 nameServer2,brokerServer2  Master2

 

 

 

 

Helloworld代码示例:

/**
 * Producer,发送消息
 * 
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer");
        producer.setNamesrvAddr("192.168.32.135:9876;92.168.32.136:9876");
        producer.start();

        for (int i = 0; i < 100; i++) {
            try {
                Message msg = new Message("TopicTest",// topic
                    "TagA",// tag
                    ("Hello RocketMQ " + i).getBytes()// body
                        );
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
            catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}

/**
 * Consumer,订阅消息
 */
public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
        consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest", "*");
        
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
//                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                try {
                    for(MessageExt msg : msgs){
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(),"utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到消息:"+" topic:"+topic+" msgBody:"+msgBody+" tags:"+tags);
                    }
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.println("Consumer Started.");
    }
}
View Code

启动消费者,生产者,结果如下:

问题:consumeMessage方法的参数是List<MessageExt> msgs,说明消费端是可以一次消费多条消息的,那么其中的一条消息处理失败后,怎么处理呢?

回答:1. 默认情况下消费端一次只消费一条消息,可以通过consumer.setConsumeMessageBatchMaxSize(10),来修改。

   2. 如果是先启动消费端,再启动生产端,即使通过上面设置了,消费端也是一次消费一条消息。

   3. 如果显示启动生产端,再启动消费端,消费端会一次消费多条消息,例如一次拉取了5条消息,如果在第四条失败了,则这五条消息会全部重试,所以这种情况需要在业务逻辑中去去除重复消息。

问题:消息是怎么重试的?

回答:1. 生成端重试:producer.setRetryTimesWhenSendFailed(3);producer.send(msg,1000); 通过前面两行代码能实现生产端重试,意思是如果消息发送超过了超时时间(1000),重发3次

     2.消费端重试:分两种,一是timeout,而是exception

      timeout是指MQ没有收到消费端的返回效应(RECONSUME_LATER/CONSUME_SUCCESS),这是MQ会一直给消费端发送消息重试。

      exception是指消费端在消费的过程中发生异常,给MQ返回了RECONSUME_LATER,这是MQ会按照一定的规则给消费端发送消息重试。

          默认重试规则:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

测试代码示例:

/**
 * Producer,发送消息
 * 
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer");
        producer.setNamesrvAddr("192.168.32.135:9876;92.168.32.136:9876");
        producer.setRetryTimesWhenSendFailed(3);
        producer.start();

        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("TopicTest",// topic
                    "TagA",// tag
                    ("Hello RocketMQ " + i).getBytes()// body
                        );
                SendResult sendResult = producer.send(msg,1000);
                System.out.println(sendResult);
            }
            catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}

/**
 * Consumer,订阅消息
 */
public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
        consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest", "*");
        //默认为1
        consumer.setConsumeMessageBatchMaxSize(10);
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
//                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                try {
                    System.out.println("消息条数:" + msgs.size());
                    for(MessageExt msg : msgs){
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(),"utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到消息:"+" topic:"+topic+" msgBody:"+msgBody+" tags:"+tags);
                    
                        if("Hello RocketMQ 4".equals(msgBody)){
                            System.out.println("=====失败消息开始=====");
                            System.out.println(msg);
                            System.out.println(msgBody);
                            System.out.println("=====失败消息结束=====");
                            int i = 6/0;
                        }
                    }
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.println("Consumer Started.");
    }
}
View Code

 

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:spring3与jdk8不兼容报错解决方法

下一篇:【JavaEE-面试总结】(未完,待续&#183;&#183;&#183;)