说说MQ之RocketMQ(二)
2018-10-16 来源:importnew
RocketMQ 的 Java API
RocketMQ 是用 Java 语言开发的,因此,其 Java API 相对是比较丰富的,当然也有部分原因是 RocketMQ 本身提供的功能就比较多。RocketMQ API 提供的功能包括,
- 广播消费,这个在之前已经提到过;
- 消息过滤,支持简单的 Message Tag 过滤,也支持按 Message Header、body 过滤;
- 顺序消费和乱序消费,之前也提到过,这里的顺序消费应该指的是普通顺序性,这一点与 Kafka 相同;
- Pull 模式消费,这个是相对 Push 模式来说的,Kafka 就是 Pull 模式消费;
- 事务消息,这个好像没有开源,但是 example 代码中有示例,总之,不推荐用;
- Tag,RocketMQ 在 Topic 下面又分了一层 Tag,用于表示消息类别,可以用来过滤,但是顺序性还是以 Topic 来看;
单看功能的话,即使不算事务消息,也不算 Tag,RocketMQ 也远超 Kafka,Kafka 应该只实现了 Pull 模式消费 + 顺序消费这2个功能。RocketMQ 的代码示例在 rocketmq-example 中,注意,代码是不能直接运行的,因为所有的代码都少了设置 name server 的部分,需要自己手动加上,例如,producer.setNamesrvAddr("192.168.232.23:9876");
。
先来看一下生产者的 API,比较简单,只有一种,如下,
import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.MessageQueueSelector; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import java.util.List; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("192.168.232.23:9876"); producer.start(); for (int i = 0; i < 10; i++) try { { Message msg = new Message("TopicTest1",// topic "TagA",// tag "OrderID188",// key ("RocketMQ "+String.format("%05d", i)).getBytes());// body SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, i)); System.out.println(String.format("%05d", i)+sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
可以发现,相比 Kafka 的 API,只多了 Tag,但实际上行为有很大不同。Kafka 的生产者客户端,有同步和异步两种模式,但都是阻塞模式,send
?方法返回发送状态的?Future
,可以通过?Future
?的?get
?方法阻塞获得发送状态。而 RocketMQ 采用的是同步非阻塞模式,发送之后立刻返回发送状态(而不是?Future
)。正常情况下,两者使用上差别不大,但是在高可用场景中发生主备切换的时候,Kafka 的同步可以等待切换完成并重连,最后返回;而 RocketMQ 只能立刻报错,由生产者选择是否重发。所以,在生产者的 API 上,其实 Kafka 是要强一些的。
另外,RocketMQ 可以通过指定?MessageQueueSelector
?类的实现来指定将消息发送到哪个分区去,Kafka 是通过指定生产者的?partitioner.class
?参数来实现的,灵活性上 RocketMQ 略胜一筹。
再来看消费者的API,由于 RocketMQ 的功能比较多,我们先看 Pull 模式消费的API,如下,
import java.util.HashMap; import java.util.Map; import java.util.Set; import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; import com.alibaba.rocketmq.client.consumer.PullResult; import com.alibaba.rocketmq.client.consumer.store.OffsetStore; import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.message.MessageQueue; public class PullConsumer { private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>(); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); consumer.setNamesrvAddr("192.168.232.23:9876"); consumer.start(); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1"); for (MessageQueue mq : mqs) { System.out.println("Consume from the queue: " + mq); SINGLE_MQ: while (true) { try { long offset = consumer.fetchConsumeOffset(mq, true); PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); if (null != pullResult.getMsgFoundList()) { for (MessageExt messageExt : pullResult.getMsgFoundList()) { System.out.print(new String(messageExt.getBody())); System.out.print(pullResult); System.out.println(messageExt); } } putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: // TODO break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static void putMessageQueueOffset(MessageQueue mq, long offset) { offseTable.put(mq, offset); } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = offseTable.get(mq); if (offset != null) return offset; return 0; } }
这部分的 API 其实是和 Kafka 很相似的,唯一不同的是,RocketMQ 需要手工管理 offset 和指定分区,而 Kafka 可以自动管理(当然也可以手动管理),并且不需要指定分区(分区是在 Kafka 订阅的时候指定的)。例子中,RocketMQ 使用 HashMap 自行管理,也可以用?OffsetStore
?接口,提供了两种管理方式,本地文件和远程 Broker。这部分感觉两者差不多。
下面再看看 Push 模式顺序消费,代码如下,
import java.util.List; import java.util.concurrent.atomic.AtomicLong; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); consumer.setNamesrvAddr("192.168.232.23:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest1", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(false); System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); this.consumeTimes.incrementAndGet(); if ((this.consumeTimes.get() % 2) == 0) { return ConsumeOrderlyStatus.SUCCESS; } else if ((this.consumeTimes.get() % 3) == 0) { return ConsumeOrderlyStatus.ROLLBACK; } else if ((this.consumeTimes.get() % 4) == 0) { return ConsumeOrderlyStatus.COMMIT; } else if ((this.consumeTimes.get() % 5) == 0) { context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
虽然提供了 Push 模式,RocketMQ 内部实际上还是 Pull 模式的 MQ,Push 模式的实现应该采用的是长轮询,这点与 Kafka 一样。使用该方式有几个注意的地方,
- 接收消息的监听类要使用?
MessageListenerOrderly
; ConsumeFromWhere
?有几个参数,表示从头开始消费,从尾开始消费,还是从某个 TimeStamp 开始消费;- 可以控制 offset 的提交,应该就是?
context.setAutoCommit(false);
?的作用;
控制 offset 提交这个特性非常有用,某种程度上扩展一下,就可以当做事务来用了,看代码?ConsumeMessageOrderlyService
?的实现,其实并没有那么复杂,在不启用 AutoCommit 的时候,只有返回?COMMIT
?才 commit offset;启用 AutoCommit 的时候,返回?COMMIT
、ROLLBACK
(这个比较扯)、SUCCESS
?的时候,都 commit offset。
后来发现,commit offset 功能在 Kafka 里面也有提供,使用新的 API,调用?consumer.commitSync
。
再看一个 Push 模式乱序消费 + 消息过滤的例子,消费者的代码如下,
import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4"); consumer.setNamesrvAddr("192.168.232.23:9876"); consumer.subscribe("TopicTest1", MessageFilterImpl.class.getCanonicalName()); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
这个例子与之前顺序消费不同的地方在于,
- 接收消息的监听类使用的是?
MessageListenerConcurrently
; - 回调方法中,使用的是自动 offset commit;
- 订阅的时候增加了消息过滤类?
MessageFilterImpl
;
消息过滤类?MessageFilterImpl
?的代码如下,
import com.alibaba.rocketmq.common.filter.MessageFilter; import com.alibaba.rocketmq.common.message.MessageExt; public class MessageFilterImpl implements MessageFilter { @Override public boolean match(MessageExt msg) { String property = msg.getUserProperty("SequenceId"); if (property != null) { int id = Integer.parseInt(property); if ((id % 3) == 0 && (id > 10)) { return true; } } return false; } }
RocketMQ 执行过滤是在 Broker 端,Broker 所在的机器会启动多个 FilterServer 过滤进程;Consumer 启动后,会向 FilterServer 上传一个过滤的 Java 类;Consumer 从 FilterServer 拉消息,FilterServer 将请求转发给 Broker,FilterServer 从 Broker 收到消息后,按照 Consumer 上传的 Java 过滤程序做过滤,过滤完成后返回给 Consumer。这种过滤方法可以节省网络流量,但是增加了 Broker 的负担。可惜我没有实验出来使用过滤的效果,即使是用 github wiki 上的例子8也没成功,不纠结了。RocketMQ 的按 Tag 过滤的功能也是在 Broker 上做的过滤,能用,是个很方便的功能。
还有一种广播消费模式,比较简单,可以去看代码,不再列出。
总之,RocketMQ 提供的功能比较多,比 Kafka 多很多易用的 API。
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点!
本站所提供的图片等素材,版权归原作者所有,如需使用,请与原作者联系。
上一篇:说说MQ之RocketMQ(三)
下一篇:携程实时用户行为系统实践