合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
Orderly The Consumer will lock each MessageQueue to make sure it is consumed one by one in order. This will cause a performance loss, but it is useful when you care about the order of the messages. It is not recommended to throw exceptions, you can return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT instead. 消费者将锁定每个MessageQueue,以确保它按顺序逐一消费。 这会导致性能损失,但在关心消息顺序时非常有用。 不建议抛出异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT。 注意,对于 consumer 而言,在暂时无法成功处理消息时,需要返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT,这样就会在一段时间之后重试消费消息。 SUSPEND_CURRENT_QUEUE_A_MOMENT 延缓多长时间执行,在当前队列里 ~~~ else if ((this.consumeTimes.get() % 5) == 0) { context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } ~~~ 4.本地消费的事务控制,ConsumeOrderlyStatus.SUCCESS(提交),ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT(挂起一会再消费),在此之前还有一个变量ConsumeOrderlyContext context的setAutoCommit()是否自动提交。 当SUSPEND_CURRENT_QUEUE_A_MOMENT时,autoCommit设置为true或者false没有区别,本质跟消费相反,把消息从msgTreeMapTemp转移回msgTreeMap,等待下次消费。 当SUCCESS时,autoCommit设置为true时比设置为false多做了2个动作,consumeRequest.getProcessQueue().commit()和this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false); ProcessQueue.commit() :本质是删除msgTreeMapTemp里的消息,msgTreeMapTemp里的消息在上面消费时从msgTreeMap转移过来的。 this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset() :本质是把拉消息的偏移量更新到本地,然后定时更新到broker。 那么少了这2个动作会怎么样呢,随着消息的消费进行,msgTreeMapTemp里的消息堆积越来越多,消费消息的偏移量一直没有更新到broker导致consumer每次重新启动后都要从头开始重复消费。 就算更新了offset到broker,那么msgTreeMapTemp里的消息堆积呢?不知道这算不算bug。 所以,还是把autoCommit设置为true吧。 当使用顺序消费的回调MessageListenerOrderly时,由于顺序消费是要前者消费成功才能继续消费,所以没有RECONSUME_LATER的这个状态,只有SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费。