合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
[TOC] ## 1. 消息出来 topic:队列的集合称为topic ## 1.1 Simple 消息直接发送,无法保证 ## 1.2 Order ### 1.2.1 使用场景 > 1. 一个生产者可以发送消息给多给topic > 2. 一个topic默认有4个队列 > 3. producer以roundrobin(轮询)的方式给多个队列发送消息 > 4. 同一个队列消息遵守FIFO > * 顺序消费: > 例如在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一、创建订单 ,第二:订单付款,第三:订单完成。也就是这个三个环节要有顺序,这个订单才有意义。 > RocketMQ可以保证顺序消费,他的实现是生产者将这个三个消息放在topic的一个队列里面,单机支持上万个持久化队列,消费端去消费的时候也是只能有一个Consumer去取得这个队列里面的数据,然后顺序消费。 > * rocketmq的顺序消息需要满足2点: > 1.Producer端保证发送消息有序,且发送到同一个队列。 > 2.consumer端只能让一个consumer保证消费同一个队列。 ### 1.2.2 使用场景 如何实现 ### 1.2.3 使用场景producer顺序发送消息到同一队列 > 1. 默认的情况下,producer会向topic(队列的集合,默认四个队列)中的队列轮询式的发生消息,这就不满足顺序消费一系列消息发送到一个队列的要求,所以要修改向队列发送消息的方法。 > 2. 重写MessageQueueSelector,从字面理解就是消息队列选择器,非常的贴切!原理就是在队列数量不变的情况下,通过一系列事务的编号(订单id)和队列叔取模 ~~~ SendResult sendResult = producer.send(msg, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, 3); # 这里的3用于取模运算,相同编号的数据会路由到同一个队列当中去 ~~~ 这里设置编号为1 ![](https://box.kancloud.cn/e55d1a164eae168f29585825ddc278a9_1737x549.png) 这里设置编号为3,验证了topic默认四个队列,且可以指定消息用于取模的id ![](https://box.kancloud.cn/f01f5866332953af0a98498f32d1f5de_1139x484.png) 以上可以保证同一系列事务被发送到了一个队列当中。 ### 1.2.4 使用场景 某一个Consumer顺序消费同一个队列 通过设置Listener实现 1. MessageListenerOrderly(有序的) 实现了MessageListenerOrderly表示一个队列只会被一个线程取到,第二个线程无法访问这个队列 自动实现顺序消费 ~~~ consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { // 设置自动提交 context.setAutoCommit(true); for (MessageExt msg : msgs) { System.out.println(msg + ",内容:" + new String(msg.getBody())); } try { TimeUnit.SECONDS.sleep(5L); } catch (InterruptedException e) { e.printStackTrace(); } ; return ConsumeOrderlyStatus.SUCCESS; } }); ~~~ 2. MessageListenerConcurrently(无序的) 需要把线程池改为单线程模式。 > 1. ConsumeMessageOrderlyService类的start()方法,如果是集群消费,则启动定时任务,定时向broker发送批量锁住当前正在消费的队列集合的消息,具体是consumer端拿到正在消费的队列集合,发送锁住队列的消息至broker,broker端返回锁住成功的队列集合。 > consumer收到后,设置是否锁住标志位。 > 这里注意2个变量: > consumer端的RebalanceImpl里的ConcurrentHashMap processQueueTable,是否锁住设置在ProcessQueue里。 > broker端的RebalanceLockManager里的ConcurrentHashMap> mqLockTable,这里维护着全局队列锁。 > 2. ConsumeMessageOrderlyService.ConsumeRequest的run方法是消费消息,这里还有个MessageQueueLock messageQueueLock,维护当前consumer端的本地队列锁。保证当前只有一个线程能够进行消费。 > 3. 拉到消息存入ProcessQueue,然后判断,本地是否获得锁,全局队列是否被锁住,然后从ProcessQueue里取出消息,用MessageListenerOrderly进行消费。 > 拉到消息后调用ProcessQueue.putMessage(final List msgs) 存入,具体是存入TreeMap msgTreeMap。 > 然后是调用ProcessQueue.takeMessags(final int batchSize)消费,具体是把msgTreeMap里消费过的消息,转移到TreeMap msgTreeMapTemp。 > 4. 本地消费的事务控制,ConsumeOrderlyStatus.SUCCESS(提交),ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT(挂起一会再消费),在此之前还有一个变量ConsumeOrderlyContext context的setAutoCommit()是否自动提交。 > 当SUSPEND_CURRENT_QUEUE_A_MOMENT时,autoCommit设置为true或者false没有区别,本质跟消费相反,把消息从msgTreeMapTemp转移回msgTreeMap,等待下次消费。 > 当SUCCESS时,autoCommit设置为true时比设置为false多做了2个动作,consumeRequest.getProcessQueue().commit()和this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false); > ProcessQueue.commit() :本质是删除msgTreeMapTemp里的消息,msgTreeMapTemp里的消息在上面消费时从msgTreeMap转移过来的。 > this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset() :本质是把拉消息的偏移量更新到本地,然后定时更新到broker。 > 那么少了这2个动作会怎么样呢,随着消息的消费进行,msgTreeMapTemp里的消息堆积越来越多,消费消息的偏移量一直没有更新到broker导致consumer每次重新启动后都要从头开始重复消费。 > 就算更新了offset到broker,那么msgTreeMapTemp里的消息堆积呢?不知道这算不算bug。 > 所以,还是把autoCommit设置为true比较好。 ## 2. 生产中的使用 ### 2.1 使用注意事项 > 1. 消费者处理MQ消息时必须幂等性(即无论接收到多少相同的消息,执行后的结果一致),如果不具有幂等性,则转换成幂等性处理方法; > 2. 业务方自己保证每条发送到RocketMQ消息都有唯一的ID,这样消费者根据消息的唯一ID去重,并确保消息处理成功。 ### 2.2 java 交互RocketMQ #### 2.2.1 producer ~~~ package com.aixin.lovetocar.rocketmq.util; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import java.util.List; /** * Created by dailin on 2018/4/25. */ public class RocketMQProducer { private static DefaultMQProducer defaultMQProducer; /** * @param groupName 指定producer组 * @param nameServer namerserver地址 */ public RocketMQProducer(String groupName, String nameServer) throws MQClientException { defaultMQProducer = new DefaultMQProducer(groupName); defaultMQProducer.setNamesrvAddr(nameServer); defaultMQProducer.start(); //producer开始 } /** * 同步发送消息 * * @param topic * @param tags * @param key * @param data * @throws Exception */ public void sentSynData(String topic, String tags, String key, String data) throws Exception { Message msg = new Message(topic, tags, key, data.getBytes()); //封装消息 SendResult sendResult = defaultMQProducer.send(msg); //发送消息 System.out.printf("%s%n", sendResult); } /** * 同步发送消息 * * @param topic * @param tags * @param data * @throws MQClientException * @throws RemotingException * @throws InterruptedException * @throws MQBrokerException */ public void sentSynData(String topic, String tags, String data) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { Message msg = new Message(topic, tags, data.getBytes()); //封装消息 SendResult sendResult = defaultMQProducer.send(msg); //发送消息 System.out.printf("%s%n", sendResult); } /** * 发送顺序消息 * * @param topic * @param tags * @param data * @param order * @throws InterruptedException * @throws RemotingException * @throws MQClientException * @throws MQBrokerException */ public void sentOrderDate(String topic, String tags, String key, String data, Integer order) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { Message msg = new Message(topic, tags, key, data.getBytes()); SendResult sendResult = defaultMQProducer.send(msg, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, order); System.out.println(sendResult); } /** * 发送顺序消息 * * @param topic * @param tags * @param data * @param order * @throws InterruptedException * @throws RemotingException * @throws MQClientException * @throws MQBrokerException */ public void sentOrderDate(String topic, String tags, String data, Integer order) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { Message msg = new Message(topic, tags, data.getBytes()); //队列选择 SendResult sendResult = defaultMQProducer.send(msg, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, order); System.out.println(sendResult); } public DefaultMQProducer getDefaultMQProducer() { return defaultMQProducer; } /** * 关闭producer与RocketMQ的连接 */ public void shudownProducer() { defaultMQProducer.shutdown(); } } ~~~