rocketmq延迟队列原理_rocketmq延迟队列原理

rocketmq延迟队列原理_rocketmq延迟队列原理在java的延迟队列中,无法支持集群的延迟。Redis可以做到对应的延迟功能,但是自己封装毕竟局限于业务。而且封装也需要耗费一定时间。今天我们就讲一个现有的延迟队列,不仅支持分布式服务,而且解耦业务代码,而且支持不同延迟时间的造好的轮子吧。~那就是RocketMQ延时队列。RocketMQ将延时队列的延时延时时间分为18个级别123456789101112131415161718分别对应下面的延迟时间,在使用时,直接传递level即可。mess

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全家桶1年46,售后保障稳定

在java的延迟队列中,无法支持集群的延迟。
Redis可以做到对应的延迟功能,但是自己封装毕竟局限于业务。而且封装也需要耗费一定时间。
今天我们就讲一个现有的延迟队列,不仅支持分布式服务,而且解耦业务代码,而且支持不同延迟时间的造好的轮子吧。 ~ 那就是 RocketMQ 延时队列。

RocketMQ将延时队列的延时延时时间分为18个级别

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 分别对应下面的延迟时间,在使用时,直接传递 level即可。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
当然这个时间可以自己修改,如果不维护 则按照默认的

在发送MQ消息的时候只需要设置

Message.setDelayTimeLevel(delayLevel);

Jetbrains全家桶1年46,售后保障稳定

MQ发送的代码:

public class DelayMQProducerTest { 
   
    public static void main(String[] args) throws MQClientException, InterruptedException { 
   
        DefaultMQProducer producer = new DefaultMQProducer("delay_test_group");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        try { 
   
            for (int i = 0; i < 3; i++) { 
   
                Message msg = new Message("Topic_Delay_Test",// topic
                        "Tag_Delay",// tag
                        (new Date() + "Topic_Delay_Test" + i).getBytes()// body
                );
                msg.setDelayTimeLevel(2); // 设置延迟级别为2 也就是 5s 
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
        } catch (Exception e) { 
   
            e.printStackTrace();
        }
       producer.shutdown();
    }
 
}

接下来就跟进到代码里看是RocketMQ是如何是做到延迟发送消息的。

本人使用的是rocketMQ 4.2 下载地址

进入Message可以看到两个方法:


  // 获取延迟等级
    public int getDelayTimeLevel() { 
   
        String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        if (t != null) { 
   
            return Integer.parseInt(t);
        }

        return 0;
    }
    // 设置延迟等级
    public void setDelayTimeLevel(int level) { 
   
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }

既然设置方法可以看到,那通过获取Level追看哪里使用,然后研究对应的实现。
获取Level的代码位置
在这里插入图片描述
在这里将topic和queueId替换为延迟队列的队列(SCHEDULE_TOPIC_XXXX),这样就保证消息不会立即被发送出去。 而是经过SCHEDULE_TOPIC_XXXX的特殊处理后,然后在发送到Consumer。

那在这里被替换后,是怎么保证延迟发送呢?

继续往下

由于对源码的不熟悉,也不了解,其实费了一些功夫,发现ScheduleMessageService.java 有start方法

 public void start() { 
   

        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { 
   
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) { 
   
                offset = 0L;
            }

            if (timeDelay != null) { 
   
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }

        this.timer.scheduleAtFixedRate(new TimerTask() { 
   

            @Override
            public void run() { 
   
                try { 
   
                    ScheduleMessageService.this.persist();
                } catch (Throwable e) { 
   
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }

从start方法中可以看到,这个时候就启动了定时器,开始从队列里获取数据了。 那么start方法是怎么被调用的呢?
在这里插入图片描述
在DefaultMessageStore中启动的。

接下来我们还是把注意力放在 ScheduleMessageService.start方法的执行过程吧。
通过源码追踪,就看到了这个方法

   public void executeOnTimeup() { 

ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) { 

SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) { 

try { 

long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { 

long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) { 

if (cq.getExt(tagsCode, cqExtUnit)) { 

tagsCode = cqExtUnit.getTagsCode();
} else { 

//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown <= 0) { 

MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) { 

try { 

MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
PutMessageResult putMessageResult =
ScheduleMessageService.this.defaultMessageStore
.putMessage(msgInner);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { 

continue;
} else { 

// XXX: warn and notify me
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) { 

/* * XXX: warn and notify me */
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
} else { 

ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally { 

bufferCQ.release();
}
} // end of if (bufferCQ != null)
else { 

long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) { 

failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}

但是注意处理的逻辑就在这里了。 如果到了延迟时间,就发送消息 否则就继续进行延迟返送。
在这里插入图片描述
总结,RocketMQ的延迟消息,使用起来方便,而且解耦代码,但是配置的延迟时间不够灵活。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/234086.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • dfs算法java(java算法预测)

    packagecom.yangkaile.generator;importlombok.extern.slf4j.Slf4j;importorg.junit.jupiter.api.Test;importjava.util.*;/***@description:DFA算法案例*@className:ApplicationTest*@author:wangdong*@Date:2021/7/2615:56*/@Slf4jpublicclas.

  • arm-linux 开发步骤

    arm-linux 开发步骤ARM-Linux开发步骤拿到一块YC2440(s3c2440)的开发板,经过几天的学习,我对arm-linux系统开发步骤有了一些认识。就以开发这个开发板为例,arm-linux开发工作大概分4个部分1.       硬件(hardware)2.       引导加载器(bootloader)3.       内核(kernel)4.       文件系统(file

  • 三星ODIN刷机包的修改

    三星ODIN刷机包的修改SunnyOK系列讲座索引【第一讲】如何用Odin刷机-新手必读http://bbs.gfan.com/android-1653492-1-1.html【第二讲】I897卡刷或CWM刷机教程http://bbs.gfan.com/android-1701867-1-1.html【第三讲】APK应用程序的解包、修改、编辑、打包及应用http://bbs

  • osip和mysql_osip2/eXosip2调试笔记

    osip和mysql_osip2/eXosip2调试笔记软件版本:libosip2-3.3.0.tar.gzlibeXosip2-3.3.0.tar.gz./configure–prefix=/opt/sip/target–disable-staticmakemakeinstall测试代码:代码来源:http://blog.csdn.net/bat603/archive/2006/11/15/1386277.aspx1、UAS…

  • 可用的NTP服务器地址「建议收藏」

    可用的NTP服务器地址「建议收藏」国内可用的Internet时间同步服务器地址(NTP时间服务器)好在阿里云提供了7个NTP时间服务器也就是Internet时间同步服务器地址ntp1.aliyun.comntp2.aliyun.comntp3.aliyun.comntp4.aliyun.comntp5.aliyun.comntp6.aliyun.comntp7.aliyun.com以下红色部分是我能ping通的地址,不能平通的也…

  • vue项目查看vue版本及cli版本

    vue项目查看vue版本及cli版本查看cli版本,执行如下:vue-V查看vue版本npmlistvue

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号