rocketmq延迟队列原理_rocketmq延迟队列原理

rocketmq延迟队列原理_rocketmq延迟队列原理在java的延迟队列中,无法支持集群的延迟。Redis可以做到对应的延迟功能,但是自己封装毕竟局限于业务。而且封装也需要耗费一定时间。今天我们就讲一个现有的延迟队列,不仅支持分布式服务,而且解耦业务代码,而且支持不同延迟时间的造好的轮子吧。~那就是RocketMQ延时队列。RocketMQ将延时队列的延时延时时间分为18个级别123456789101112131415161718分别对应下面的延迟时间,在使用时,直接传递level即可。mess

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全家桶1年46,售后保障稳定

在java的延迟队列中,无法支持集群的延迟。
Redis可以做到对应的延迟功能,但是自己封装毕竟局限于业务。而且封装也需要耗费一定时间。
今天我们就讲一个现有的延迟队列,不仅支持分布式服务,而且解耦业务代码,而且支持不同延迟时间的造好的轮子吧。 ~ 那就是 RocketMQ 延时队列。

RocketMQ将延时队列的延时延时时间分为18个级别

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 分别对应下面的延迟时间,在使用时,直接传递 level即可。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
当然这个时间可以自己修改,如果不维护 则按照默认的

在发送MQ消息的时候只需要设置

Message.setDelayTimeLevel(delayLevel);

Jetbrains全家桶1年46,售后保障稳定

MQ发送的代码:

public class DelayMQProducerTest { 
   
    public static void main(String[] args) throws MQClientException, InterruptedException { 
   
        DefaultMQProducer producer = new DefaultMQProducer("delay_test_group");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        try { 
   
            for (int i = 0; i < 3; i++) { 
   
                Message msg = new Message("Topic_Delay_Test",// topic
                        "Tag_Delay",// tag
                        (new Date() + "Topic_Delay_Test" + i).getBytes()// body
                );
                msg.setDelayTimeLevel(2); // 设置延迟级别为2 也就是 5s 
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
        } catch (Exception e) { 
   
            e.printStackTrace();
        }
       producer.shutdown();
    }
 
}

接下来就跟进到代码里看是RocketMQ是如何是做到延迟发送消息的。

本人使用的是rocketMQ 4.2 下载地址

进入Message可以看到两个方法:


  // 获取延迟等级
    public int getDelayTimeLevel() { 
   
        String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        if (t != null) { 
   
            return Integer.parseInt(t);
        }

        return 0;
    }
    // 设置延迟等级
    public void setDelayTimeLevel(int level) { 
   
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }

既然设置方法可以看到,那通过获取Level追看哪里使用,然后研究对应的实现。
获取Level的代码位置
在这里插入图片描述
在这里将topic和queueId替换为延迟队列的队列(SCHEDULE_TOPIC_XXXX),这样就保证消息不会立即被发送出去。 而是经过SCHEDULE_TOPIC_XXXX的特殊处理后,然后在发送到Consumer。

那在这里被替换后,是怎么保证延迟发送呢?

继续往下

由于对源码的不熟悉,也不了解,其实费了一些功夫,发现ScheduleMessageService.java 有start方法

 public void start() { 
   

        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { 
   
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) { 
   
                offset = 0L;
            }

            if (timeDelay != null) { 
   
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }

        this.timer.scheduleAtFixedRate(new TimerTask() { 
   

            @Override
            public void run() { 
   
                try { 
   
                    ScheduleMessageService.this.persist();
                } catch (Throwable e) { 
   
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }

从start方法中可以看到,这个时候就启动了定时器,开始从队列里获取数据了。 那么start方法是怎么被调用的呢?
在这里插入图片描述
在DefaultMessageStore中启动的。

接下来我们还是把注意力放在 ScheduleMessageService.start方法的执行过程吧。
通过源码追踪,就看到了这个方法

   public void executeOnTimeup() { 
   
            ConsumeQueue cq =
                ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
                    delayLevel2QueueId(delayLevel));

            long failScheduleOffset = offset;

            if (cq != null) { 
   
                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                if (bufferCQ != null) { 
   
                    try { 
   
                        long nextOffset = offset;
                        int i = 0;
                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { 
   
                            long offsetPy = bufferCQ.getByteBuffer().getLong();
                            int sizePy = bufferCQ.getByteBuffer().getInt();
                            long tagsCode = bufferCQ.getByteBuffer().getLong();

                            if (cq.isExtAddr(tagsCode)) { 
   
                                if (cq.getExt(tagsCode, cqExtUnit)) { 
   
                                    tagsCode = cqExtUnit.getTagsCode();
                                } else { 
   
                                    //can't find ext content.So re compute tags code.
                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                        tagsCode, offsetPy, sizePy);
                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                                }
                            }

                            long now = System.currentTimeMillis();
                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                            long countdown = deliverTimestamp - now;

                            if (countdown <= 0) { 
   
                                MessageExt msgExt =
                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                        offsetPy, sizePy);

                                if (msgExt != null) { 
   
                                    try { 
   
                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                        PutMessageResult putMessageResult =
                                            ScheduleMessageService.this.defaultMessageStore
                                                .putMessage(msgInner);

                                        if (putMessageResult != null
                                            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { 
   
                                            continue;
                                        } else { 
   
                                            // XXX: warn and notify me
                                            log.error(
                                                "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                                msgExt.getTopic(), msgExt.getMsgId());
                                            ScheduleMessageService.this.timer.schedule(
                                                new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                    nextOffset), DELAY_FOR_A_PERIOD);
                                            ScheduleMessageService.this.updateOffset(this.delayLevel,
                                                nextOffset);
                                            return;
                                        }
                                    } catch (Exception e) { 
   
                                        /* * XXX: warn and notify me */
                                        log.error(
                                            "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                                + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                                + offsetPy + ",sizePy=" + sizePy, e);
                                    }
                                }
                            } else { 
   
                                ScheduleMessageService.this.timer.schedule(
                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                                    countdown);
                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                                return;
                            }
                        } // end of for

                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    } finally { 
   

                        bufferCQ.release();
                    }
                } // end of if (bufferCQ != null)
                else { 
   

                    long cqMinOffset = cq.getMinOffsetInQueue();
                    if (offset < cqMinOffset) { 
   
                        failScheduleOffset = cqMinOffset;
                        log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                            + cqMinOffset + ", queueId=" + cq.getQueueId());
                    }
                }
            } // end of if (cq != null)

            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                failScheduleOffset), DELAY_FOR_A_WHILE);
        }

但是注意处理的逻辑就在这里了。 如果到了延迟时间,就发送消息 否则就继续进行延迟返送。
在这里插入图片描述
总结,RocketMQ的延迟消息,使用起来方便,而且解耦代码,但是配置的延迟时间不够灵活。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/234086.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • Java和c++哪个就业前景好

    Java和c++哪个就业前景好二、回顾整理阿里面试题基本就这样了,还有一些零星的问题想不起来了,答案也整理出来了。自我介绍JVM如何加载一个类的过程,双亲委派模型中有哪些方法?HashMap如何实现的?HashMap和ConcurrentHashMap区别,ConcurrentHashMap线程安全hashtable吗,ConcurrentHashMap如何保证线程安全?HashMap和HashTable区别,HashTable线程安全吗?进程间通信有哪几种方式JVM分为哪些区,每一个区干吗的?JVM如

  • hikaripool信息_HikariPool源码(三)资源池动态伸缩「建议收藏」

    hikaripool信息_HikariPool源码(三)资源池动态伸缩「建议收藏」Java极客|作者/铿然一叶这是Java极客的第54篇原创文章1.资源池的动态伸缩1.为了提升资源池的性能,需要设置最小闲置资源数量,在资源池初始化时完成初始化;而当使用的资源超过最小闲置资源数,消费者释放回池中超过一定时间后要收缩到最小闲置资源数。2.为了避免无限申请资源导致超出负载,需要设置最大资源数,池中资源不能超出最大资源数。2.动态伸缩相关类结构职责说明:类职责Hou…

  • JSP/Servlet工作原理[通俗易懂]

    JSP/Servlet工作原理[通俗易懂]JSP/Servlet工作原理ServletServlet没有main方法,不能够独立运行,它的运行需要容器的支持,Tomcat是最常用的JSP/Servlet容器。Servlet运行在Servlet容器中,并由容器管理从创建到销毁的整个过程Servlet的生命周期加载和实例化Servlet容器装载和实例化一个Servlet。创建出该Servlet类的一个实例。初始化在Servlet

  • Northwind数据库关系图[通俗易懂]

    Northwind数据库关系图[通俗易懂]鉴于很MS的示例都是利用Sql2000的NorthWind示例数据库来讲解的,今天在学习LInq时,顺便把Northwind各表之间的关系图整理了一下,方便初学者查阅 

  • 博客中KindEditor配置[通俗易懂]

    博客中KindEditor配置[通俗易懂]1.下载 KindEditor2.放在static/js下3.在admin.py中配置文件:4.在static/js/kindeditor下配置config.js文件首先在kindedito官网文档中,找到代码:根据代码提示编辑:5.编辑图片和文件上传代码:#-*-coding:utf-8-*-fromdjango.httpimp…

    2022年10月12日
  • SQL数据库数据类型_数据表的常见数据类型有哪些

    SQL数据库数据类型_数据表的常见数据类型有哪些文章目录1. 整数型● bigint(大整数)● int(整数)● smallint(短整数)● tinyint(微短整数)2. 精确数值型numeric | decimal(p[,s])3. 浮点型● real● float[(n)]4. 货币型●money● smallmoney5. 位型6. 字符型●char[(n)]●varchar[(n)]7. Unicode字符型● nchar[(n)…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号