RocketMQ——消息ACK机制及消费进度管理_rocketmqmessagelistener ack-程序员宅基地

技术标签: mq  

RocketMQ——消息ACK机制及消费进度管理

2017-01-25 WED 20:49

RokectMQ——水平扩展及负载均衡详解 中剖析过,consumer的每个实例是靠队列分配来决定如何消费消息的。那么消费进度具体是如何管理的,又是如何保证消息成功消费的?(RocketMQ有保证消息肯定消费成功的特性,失败则重试)?

本文将详细解析消息具体是如何ack的,又是如何保证消费肯定成功的。

由于以上工作所有的机制都实现在PushConsumer中,所以本文的原理均只适用于RocketMQ中的PushConsumer即Java客户端中的DefaultPushConsumer。 若使用了PullConsumer模式,类似的工作如何ack,如何保证消费等均需要使用方自己实现。

注:广播消费和集群消费的处理有部分区别,以下均特指集群消费(CLSUTER),广播(BROADCASTING)下部分可能不适用。

保证消费成功

PushConsumer为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。

消费的时候,我们需要注入一个消费回调,具体sample代码如下:

    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
            doMyJob();//执行真正消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是消费完成的。(具体如何ACK见后面章节)

如果这时候消息消费失败,例如数据库异常,余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。

为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个消费租的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。

注:

  1. 如果业务的回调没有处理好而抛出异常,会认为是消费失败当ConsumeConcurrentlyStatus.RECONSUME_LATER处理。
  2. 当使用顺序消费的回调MessageListenerOrderly时,由于顺序消费是要前者消费成功才能继续消费,所以没有RECONSUME_LATER的这个状态,只有SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费。

启动的时候从哪里消费

当新实例启动的时候,PushConsumer会拿到本消费组broker已经记录好的消费进度(consumer offset),按照这个进度发起自己的第一次Pull请求。

如果这个消费进度在Broker并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择:

CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息
CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前

所以,社区中经常有人问:“为什么我设了CONSUME_FROM_LAST_OFFSET,历史的消息还是被消费了”? 原因就在于只有全新的消费组才会使用到这些策略,老的消费组都是按已经存储过的消费进度继续消费。

对于老消费组想跳过历史消息需要自身做过滤,或者使用先修改消费进度。示例代码请参看:RocketMQ——消息文件过期原理

消息ACK机制

RocketMQ是以consumer group+queue为单位是管理消费进度的,以一个consumer offset标记这个这个消费组在这条queue上的消费进度。

如果某已存在的消费组出现了新消费实例的时候,依靠这个组的消费进度,就可以判断第一次是从哪里开始拉取的。

每次消息成功后,本地的消费进度会被更新,然后由定时器定时同步到broker,以此持久化消费进度。

但是每次记录消费进度的时候,只会把一批消息中最小的offset值为消费进度值,如下图:

message ack

这钟方式和传统的一条message单独ack的方式有本质的区别。性能上提升的同时,会带来一个潜在的重复问题——由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如 2101-2200的消息,后面99条都消费结束了,只有2101消费一直没有结束的情况。

在这种情况下,RocketMQ为了保证消息肯定被消费成功,消费进度职能维持在2101,直到2101也消费结束了,本地的消费进度才能标记2200消费结束了(注:consumerOffset=2201)。

在这种设计下,就有消费大量重复的风险。如2101在还没有消费完成的时候消费实例突然退出(机器断电,或者被kill)。这条queue的消费进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次。

对于这个场景,RocketMQ暂时无能为力,所以业务必须要保证消息消费的幂等性,这也是RocketMQ官方多次强调的态度。

实际上,从源码的角度上看,RocketMQ可能是考虑过这个问题的,截止到3.2.6的版本的源码中,可以看到为了缓解这个问题的影响面,DefaultMQPushConsumer中有个配置consumeConcurrentlyMaxSpan

/**
 * Concurrently max span offset.it has no effect on sequential consumption
 */
private int consumeConcurrentlyMaxSpan = 2000;

这个值默认是2000,当RocketMQ发现本地缓存的消息的最大值-最小值差距大于这个值(2000)的时候,会触发流控——也就是说如果头尾都卡住了部分消息,达到了这个阈值就不再拉取消息。

但作用实际很有限,像刚刚这个例子,2101的消费是死循环,其他消费非常正常的话,是无能为力的。一旦退出,在不人工干预的情况下,2101后所有消息全部重复!

Ack卡进度解决方案

实际上对于卡住进度的场景,可以选择弃车保帅的方案:把消息卡住那些消息,先ack掉,让进度前移。但要保证这条消息不会因此丢失,ack之前要把消息sendBack回去,这样这条卡住的消息就会必然重复,但会解决潜在的大量重复的场景。 这也是我们公司自己定制的解决方案。

部分源码如下:

class ConsumeRequestWithUnAck implements Runnable {
    final ConsumeRequest consumeRequest;
    final long resendAfterIfStillUnAck;//n毫秒没有消费完,就重发

    ConsumeRequestWithUnAck(ConsumeRequest consumeRequest,long resendAfterIfStillUnAck) {
        this.consumeRequest = consumeRequest;
        this.resendAfterIfStillUnAck = resendAfterIfStillUnAck;
    }

    @Override
    public void run() {
        //每次消费前,计划延时任务,超时则ack并重发
        final WeakReference<ConsumeRequest> crReff = new WeakReference<>(this.consumeRequest);
        ScheduledFuture scheduledFuture=null;
        if(!ConsumeDispatcher.this.ackAndResendScheduler.isShutdown()) {
            scheduledFuture= ConsumeDispatcher.this.ackAndResendScheduler.schedule(new ConsumeTooLongChecker(crReff),resendAfterIfStillUnAck,TimeUnit.MILLISECONDS);
        }
        try{
            this.consumeRequest.run();//正常执行并更新offset
        }
        finally {
            if (scheduledFuture != null) scheduledFuture.cancel(false);//消费结束后,取消任务
        }
    }

}
  1. 定义了一个装饰器,把原来的ConsumeRequest对象包了一层。
  2. 装饰器中,每条消息消费前都会调度一个调度器,定时触发,触发的时候如果发现消息还存在,就执行sendback并ack的操作。

后来RocketMQ显然也发现了这个问题,RocketMQ在3.5.8之后也是采用这样的方案去解决这个问题。只是实现方式上有所不同(事实上我认为RocketMQ的方案还不够完善)

  1. 在pushConsumer中 有一个consumeTimeout字段(默认15分钟),用于设置最大的消费超时时间。消费前会记录一个消费的开始时间,后面用于比对。
  2. 消费者启动的时候,会定期扫描所有消费的消息,达到这个timeout的那些消息,就会触发sendBack并ack的操作。这里扫描的间隔也是consumeTimeout(单位分钟)的间隔。

核心源码如下:

//ConsumeMessageConcurrentlyService.java
public void start() {
    this.CleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            cleanExpireMsg();
        }

    }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
//ConsumeMessageConcurrentlyService.java
private void cleanExpireMsg() {
    Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
            this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
    while (it.hasNext()) {
        Map.Entry<MessageQueue, ProcessQueue> next = it.next();
        ProcessQueue pq = next.getValue();
        pq.cleanExpiredMsg(this.defaultMQPushConsumer);
    }
}

//ProcessQueue.java
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
    if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
        return;
    }

    int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
    for (int i = 0; i < loop; i++) {
        MessageExt msg = null;
        try {
            this.lockTreeMap.readLock().lockInterruptibly();
            try {
                if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
                    msg = msgTreeMap.firstEntry().getValue();
                } else {

                    break;
                }
            } finally {
                this.lockTreeMap.readLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("getExpiredMsg exception", e);
        }

        try {

            pushConsumer.sendMessageBack(msg, 3);
            log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
            try {
                this.lockTreeMap.writeLock().lockInterruptibly();
                try {
                    if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
                        try {
                            msgTreeMap.remove(msgTreeMap.firstKey());
                        } catch (Exception e) {
                            log.error("send expired msg exception", e);
                        }
                    }
                } finally {
                    this.lockTreeMap.writeLock().unlock();
                }
            } catch (InterruptedException e) {
                log.error("getExpiredMsg exception", e);
            }
        } catch (Exception e) {
            log.error("send expired msg exception", e);
        }
    }
}

通过这个逻辑对比我定制的时间,可以看出有几个不太完善的问题:

  1. 消费timeout的时间非常不精确。由于扫描的间隔是15分钟,所以实际上触发的时候,消息是有可能卡住了接近30分钟(15*2)才被清理。
  2. 由于定时器一启动就开始调度了,中途这个consumeTimeout再更新也不会生效。

 2017-01-25 Wed 20:49  javarocketmq

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/linuxheik/article/details/79579329

智能推荐

canal 全量/增量数据同步说明_canal 全量同步数据-程序员宅基地

文章浏览阅读7.4k次,点赞3次,收藏8次。一、日志文件完整1、全量数据同步1、修改\canal.deployer-1.1.5\conf\example下的instance.properties通过以下三个配置实现canal全量数据同步# mysql日志文件canal.instance.master.journal.name=mysql-bin.000001# 获取日志的起始位置canal.instance.master.position=0# 获取日志的起始时间戳canal.instance.master.timestamp=16_canal 全量同步数据

法语电话面试程序员面试问题和汇总-程序员宅基地

文章浏览阅读885次,点赞24次,收藏18次。法语电话面试程序员面试问题和汇总

全金属vivaldi天线设计----学习笔记_(1)vivaldi天线(ava)设计-程序员宅基地

文章浏览阅读1.1k次,点赞3次,收藏19次。全金属 Vivaldi 天线单元为宽频带单元,其在自由空间中孤立单元的特性与构成阵列的阵元的特性是不同的。_(1)vivaldi天线(ava)设计

springboot-开机自启功能实现-CommandLineRunner和ApplicationRunner_springboot 开机自启动-程序员宅基地

文章浏览阅读212次。区别:1.两个接口的实现方法一样,参数不一样,其他没什么区别。两个参数都可以接收java命令设置的参数及值2.ApplicationRunner接口的实现方法比CommandLineRunner接口的实现方法前执行(也可通过设置@Order决定谁先执行)作用:满足springBoot框架需要预加载数据需求,执行操作的时间是在容器启动末尾时间执行操作。_springboot 开机自启动

vsCode 快捷键-程序员宅基地

文章浏览阅读45次。记住常用的快捷键,对开发来说,简直是行云流水,心里无比顺畅。按 Press功能 FunctionCtrl + Shift + P,F1显示命令面板 Show Command PaletteCtrl + P快速打开 Quick OpenCtrl + Shift + N新窗口/实例 New window/instanceCtr...

Devin:全球首个AI程序员!自主学习,写代码,查bug!_devin ai-程序员宅基地

文章浏览阅读2k次,点赞30次,收藏22次。Devin:全球首个AI程序员!自主学习,写代码,查bug!_devin ai

随便推点

手机python编程软件哪个好,手机python编程软件app-程序员宅基地

文章浏览阅读885次,点赞16次,收藏20次。正确的用法,应该就是学编程的时候,用来练习练习,倒是一个不错的好选择,或者自己有些小项目,拿来码码代码什么的。由于内置了SL4A,可以很方便的调用安卓操作系统的一些API做些有趣的事情,比如可以通过SL4A获取手机地理位置,打开蓝牙,发送手机短信,打开手机摄像头等等。Qpython是一个Python引擎,只能运行在安卓系统上,内置了一个Python编辑器,可以直接在手机上写Python代码,支持缩进,语法高亮等特性。这是编程IDE的标配,可以执行一些代码片段,不过写手机上输入代码还是挺麻烦的。

Day01:Web应用&架构搭建&站库分离&路由访问&配置受限&DNS解析_架构搭建-站库分离-路由访问-配置受限-dns解析-程序员宅基地

文章浏览阅读891次,点赞14次,收藏17次。小迪安全-Day01:Web应用&架构搭建&站库分离&路由访问&配置受限&DNS解析_架构搭建-站库分离-路由访问-配置受限-dns解析

计算机毕业设计(附源码)python中小学家校通系统_家校一体化教育系统的开发python-程序员宅基地

文章浏览阅读235次。考试成绩管理,在考试成绩管理页面中可以对索引、家长账号、家长姓名、学号、学生姓名、班级、学年、学期、科目、考试成绩、等级、教师评语、教师工号、教师姓名等信息进行详情,修改或删除等详细操作,如图5-9所示。家长管理,在家长管理页面中可以对索引、家长账号、家长姓名、性别、头像、学号、学生姓名、班级、年龄、关系、家长手机、家长邮箱等信息进行详情、修改或删除等详细操作,如图5-5所示。个人中心,在个人中心页面通过填写家长账号、家长姓名、性别、头像、学号、学生姓名、班级、年龄、关系、家长手机、家长邮箱等信息进行。_家校一体化教育系统的开发python

修改Matlab的背景颜色_matlab背景颜色-程序员宅基地

文章浏览阅读1.9k次。然后,使用"set"函数和"gcf"参数来设置当前图形窗口的属性。总结一下,通过使用Matlab的"figure"函数和"Color"属性,我们可以轻松修改Matlab的背景颜色。本文将详细介绍如何使用Matlab代码修改背景颜色。需要注意的是,上述代码中的"set"函数将背景颜色设置为当前图形窗口的属性。如果你想要修改特定的图形对象或图形句柄的背景颜色,可以将"set"函数中的"gcf"替换为相应的图形对象或图形句柄。要修改Matlab的背景颜色,我们可以使用"figure"函数和"Color"属性。_matlab背景颜色

Ubuntu14.04和16.04官方默认更新源sources.list和第三方源推荐(干货!)_ubuntu16.04 源 source.list-程序员宅基地

文章浏览阅读6.9k次,点赞2次,收藏14次。写在前面:笔者由于还在学校学习,学校没有开发给Linux用的上网客户端,所以只能用在windows系统中通过安装虚拟机运行linux比较方便,但没有外网,只有学校的教育网,所以我需要将ubuntu的默认源修改为教育网中的资源才可以完美运行ubuntu,当然这个教程也适用于修改为非教育源。 第一步,备份官方的默认源  避免自己手贱操作失误,重装系统太费时间cp /etc/apt/sources.li..._ubuntu16.04 源 source.list

devops资料大全-程序员宅基地

文章浏览阅读652次。备份备份软件Amanda -客户端-服务器模型备份工具Bacula - 另一个客户端-服务器模型备份工具Backupninja -轻量级,可扩展的元数据备份系统Backuppc -客户端-服务器模型备份工具和文件共享方案。Burp -网络备份和还原程序Duplicity -使用rsync算法加密的带宽-效率备份Lsyncd -监控一个本地目录树的变..._devops 大全