Jafka来源分析——Processor

Jafka来源分析——Processor

大家好,又见面了,我是全栈君,今天给大家准备了Idea注册码。

Jafka Acceptor接受client而建立后的连接请求,Acceptor会将Socket连接交给Processor进行处理。Processor通过下面的处理步骤进行client请求的处理:

1. 读取client请求。

2. 依据client请求类型的不同,调用对应的处理函数进行处理。

Processor读取client请求是一个比較有意思的事情,须要考虑两个方面的事情:第一,请求规则(Processor须要依照一定的规则进行请求的解析)。第二,怎样确定一次请求的读取已经结束(由于是非堵塞连接,很有可能第一次读操作读取了请求的一部分数据,第二次到第N次读取才干把整个client请求读取完整)。以下我们具体解析一下client请求的格式。

client请求首先包括一个int,该int指明本次client请求的大小(size)。随后,请求包括一个两个byte(short)的请求类型(请求类型包括:CreaterRequestDeleterRequestFetchRequestMultiFetchRequestMultiProducerRequestOffsetRequestProducerRequest。然后每种请求类型有固定的格式。下图具体说明了ProducerRequest的格式:

Jafka来源分析——Processor


知道了上面的格式之后,问题二(怎样确定一次请求已经读取完毕)就非常easy攻克了。

首先为“请求长度”分配一个4byteByteBuffer,直到该Buffer读满,否则说明长度一直没有读取完毕。“请求长度”读取完毕后,为请求分配一个“请求长度”大小的ByteBuffer,直到该Buffer读满则说明一次请求读取完毕。读取完毕后,依据“请求类型”调用对应的处理函数(Handler)进行处理。在jafka中,上述的两个Buffer在类BoundedByteBufferReceive中进行声明和管理。Processor接收到Acceptor分配的socket连接后。会为socke连接建立一个BoundedByteBufferReceive并将其与socket连接进行绑定。每当该socket连接“可读”时。将BoundedByteBufferReceive拿出来从上次读取的基础上继续读取。直到一次请求彻底读取完毕,详细过程如以下代码(Processor.read)所看到的:

private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = channelFor(key);
Receive request = null;
request = new BoundedByteBufferReceive(maxRequestSize);
key.attach(request);
} else {
request = (Receive) key.attachment();
}
int read = request.readFrom(socketChannel);
stats.recordBytesRead(read);
if (read < 0) {
close(key);
} else if (request.complete()) {
Send maybeResponse = handle(key, request);
key.attach(null);
// if there is a response, send it, otherwise do nothing
if (maybeResponse != null) {
key.attach(maybeResponse);
key.interestOps(SelectionKey.OP_WRITE);
}
} else {
// more reading to be done
key.interestOps(SelectionKey.OP_READ);
getSelector().wakeup();
if (logger.isTraceEnabled()) {
logger.trace("reading request not been done. " + request);
}
}
}

BoundedByteBufferReceive.readFrom的实现详细例如以下:主要是申请两个Buffer并不断的读取数据。

public int readFrom(ReadableByteChannel channel) throws IOException {
        expectIncomplete();
        int read = 0;
        if (sizeBuffer.remaining() > 0) {
            read += Utils.read(channel, sizeBuffer);
        }
        if (contentBuffer == null && !sizeBuffer.hasRemaining()) {
            sizeBuffer.rewind();
            int size = sizeBuffer.getInt();
            if (size <= 0) {
                throw new InvalidRequestException(...);
            }
            if (size > maxRequestSize) {
                final String msg = "Request of length %d is not valid, it is larger than the maximum size of %d bytes.";
                throw new InvalidRequestException(format(msg, size, maxRequestSize));
            }
            contentBuffer = byteBufferAllocate(size);
        }
        //
        if (contentBuffer != null) {
            read = Utils.read(channel, contentBuffer);
            //
            if (!contentBuffer.hasRemaining()) {
                contentBuffer.rewind();
                setCompleted();
            }
        }
        return read;
    }

读取完毕后,Processor会解析“请求类型”,依据请求类型的不同调用不同的Handler处理对应于该请求。

版权声明:本文博主原创文章,博客,未经同意不得转载。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/116982.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • 微型计算机的字节取决于什么的宽度,计算机字长取决于什么的长度[通俗易懂]

    微型计算机的字节取决于什么的宽度,计算机字长取决于什么的长度[通俗易懂]满意答案ajpno2013.03.28采纳率:44%等级:12已帮助:10279人计算机的字长取决于数据总线的宽度.字长是指计算机内部参与运算的数的位数。它决定着计算机内部寄存器、ALU和数据总线的位数,直接影响着机器的硬件规模和造价。字长直接反映了一台计算机的计算精度,为适应不同的要求及协调运算精度和硬件造价间的关系,大多数计算机均支持变字长运算,即机内可实现半字长、全字长(或单字长)和…

  • CICD之Jenkins使用

    CICD之Jenkins使用一、Jenkins1、Docker安装Jenkins1.docker安装dockerrun\-uroot\-d\-p8080:8080\-p50000:50000\-vjenkins-data:/var/jenkins_home\-v/var/run/docker.sock:/var/run/docker.sock\jenkinsci/blueocean2.可选镜像jenkins/jenkins:lts#可选镜像jenkin

  • 远程连接mysql8,报错10061 解决办法「建议收藏」

    远程连接mysql8,报错10061 解决办法「建议收藏」mysql8.0的1,检查服务器mysql服务是否启动100612,mysql必须设置密码,不然报错10061usemysql;updateusersetauthentication_string=””whereuser=“root”;flushprivileges;ALTERUSER‘root’@’%’IDENTIFIEDWITHmysql_native_passwordBY‘密码’;3,selectHost,Userfromuser;查看roo

    2022年10月13日
  • java线程与cpu线程_坑惨了什么意思

    java线程与cpu线程_坑惨了什么意思在java中,线程间的通信可以使用wait、notify、notifyAll来进行控制。从名字就可以看出来这3个方法都是跟多线程相关的,但是可能让你感到吃惊的是:这3个方法并不是Thread类或者是Runnable接口的方法,而是Object类的3个本地方法。下图是我总结的Java资料,想要资料的话请点795983544暗号CSDN。其实要理解这一点也并不难,调用一个Object的wait与notify/notifyAll的时候,必须保证调用代码对该Object是同步的,也就是说必须在作用等.

  • nomasp 博客导读:Android、UWP、Algorithm、Lisp(找工作中……[通俗易懂]

    nomasp 博客导读:Android、UWP、Algorithm、Lisp(找工作中……

  • 【C语言】贪吃蛇游戏的实现(一)[通俗易懂]

    【C语言】贪吃蛇游戏的实现(一)[通俗易懂]最近由于小创需要,捣鼓了一个贪吃蛇游戏,系统由纯C语言开发,VC++6.0编译通过,具体的运行效果如下:略显简陋的开始界面,图案是我一点一点拼的,因为有\需要转义,所以对齐也花了不少时间,足够繁杂和无聊的工作。游戏界面如图所示,左侧方框是地图,菱形组成的线是蛇,红色圆点是食物,吃到加分。游戏结束界面可以选择再来一局或者退出。系统源码链接:https://download…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号