大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。
Jetbrains全系列IDE稳定放心使用
RocketMQ消费端有两种获取消息的方式,Push方式和Pull方式。但这两种方式都有一定的缺陷,后来采用了一种折中的方法,采用”长轮询“的方式,它既可以拥有Pull的优点,又能达到保证实时性的目的。
长轮询的思想:
服务端接收到新消息请求后,如果队列里没有新消息,并不急于返回,通过一个循环不断查看状态,每次waitForRunning一段时间(默认是5秒),然后再Check。Broker默认最长阻塞时间为15秒,默认情况下当Broker一直没有新消息,第三次Check的时候,等待时间超过最长阻塞时间,就返回空结果。在等待的过程中,Broker收到了新的消息后会直接返回请求结果。
“长轮询”的核心是,Broker端hold住客户端过来的请求一小段时间。在这段时间内有新的消息到达,就利用现有的连接立即返回消息给Consumer。
何时调用?
当未在Broker中查找到新信息时,状态代码为PULL_NOT_FOUND,会创建拉取任务PullRequest并提交到PullRequestHoldService线程中。
private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
new ConcurrentHashMap<String, ManyPullRequest>(1024);
该类中有一个重要的参数pullRequestTable
,key为“主题@队列号”,value是对应的ManyPullRequest。
先看一下它的run方法。
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
//Consumer订阅消息时,Broker是否开启长轮询
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { //开启长轮询,每5秒尝试一次
this.waitForRunning(5 * 1000);
} else {
//没有开启长轮询,默认等待1秒再次尝试 this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 * 1000) {
log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
}
} catch (Throwable e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info("{} service end", this.getServiceName());
}
从上面可以看出,如果开启了长轮询,每5s尝试一次,利用checkHoldRequest方法来判断是否有新消息的产生。如果未开启长轮询,则默认1s再次尝试。
然后再阅读一下checkHoldRequest方法。
private void checkHoldRequest() {
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
}
}
}
}
从上面可以看出,它会遍历pullRequestTable,从key名中可以得到主题名topic和队列名queueId,然后通过topic和queueID获取到该消息队列的最大偏移量,之后调用notifyMessageArriving方法。
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
List<PullRequest> requestList = mpr.cloneListAndClear();
if (requestList != null) {
List<PullRequest> replayList = new ArrayList<PullRequest>();
for (PullRequest request : requestList) {
long newestOffset = maxOffset;
if (newestOffset <= request.getPullFromThisOffset()) {
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
}
if (newestOffset > request.getPullFromThisOffset()) {
boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
// match by bit map, need eval again when properties is not null.
if (match && properties != null) {
match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
}
if (match) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
}
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
replayList.add(request);
}
if (!replayList.isEmpty()) {
mpr.addPullRequest(replayList);
}
}
}
}
在notifyMessageArriving
方法中,首先会获取到当前该主题、队列中的所有的挂起拉取任务,如果该消息队列的最大偏移量大于待拉取偏移量,说明有新的消息传入。如果消息匹配后,则调用executeRequestWhenWakeup
将消息返回给消息拉取客户端,否则等待下一次尝试。
如果挂起超时时间超时,则不继续等待将直接返回客户消息未找到。
从上面的机制可以看出开启长轮询后,不是实时的进行判断是否有新的消息产生,而是等待5s后再进行一次判断,不具有实时性。
在消息存储中,存在一个线程ReputMessageService,它会实时更新消息消费队列和索引文件,每执行一次任务推送后会休息1毫秒就继续尝试推送消息到消费队列和索引文件。
当新消息达到CommitLog时,ReputMessageService线程负责将消息转发给ConsumeQueue、IndexFile,如果Broker端开启了长轮询模式并且角色主节点,则最终将调用PullRequestHoldService线程的notifyMessageArriving方法唤醒挂起线程,判断当前消费队列最大偏移量是否大于待拉取偏移量,如果大于则拉取消息。长轮询模式使得消息消息拉取能够实现准实时。
0人点赞
作者:九点半的马拉
链接:https://www.jianshu.com/p/68123e7bf03e
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/181992.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...