《Dubbo进阶一》——RPC协议底层原理

《Dubbo进阶一》——RPC协议底层原理一RPC协议简介在一个典型的RPC的使用场景中,包含了服务发现、负载、容错、序列化和网络传输等组件,其中RPC协议指明了程序如何进行序列化和网络传输,也就是说一个RPC协议的实现等于一个非透明的RPC调用。简单来说,分布式框架的核心是RPC框架,RPC框架的核心是RPC协议。二协议的基本组成IP:服务提供者的地址端口:协议指定开放端口运行服务(1)netty(2)mima…

大家好,又见面了,我是你们的朋友全栈君。

一 RPC协议简介

在一个典型的RPC的使用场景中,包含了服务发现、负载、容错、序列化和网络传输等组件,其中RPC协议指明了程序如何进行序列化和网络传输,也就是说一个RPC协议的实现等于一个非透明的RPC调用。
在这里插入图片描述
简单来说,分布式框架的核心是RPC框架,RPC框架的核心是RPC协议

dubbo 支持的RPC协议列表

名称 实现描述 连接描述 使用场景
dubbo 传输服务: mina, netty(默认), grizzy; 序列化: dubbo, hessian2(默认), java, fastjson。 自定义报文 单个长连接NIO;异步传输 1.常规RPC调用 2.传输数据量小 3.提供者少于消费者
rmi 传输:java rmi 服务; 序列化:java原生二进制序列化 多个短连接; BIO同步传输 1.常规RPC调用 2.与原RMI客户端集成 3.可传少量文件 4.不支持防火墙穿透
hessian 传输服务:servlet容器; 序列化:hessian二进制序列化 基于Http 协议传输,依懒servlet容器配置 1.提供者多于消费者 2.可传大字段和文件 3.跨语言调用
http 传输服务:servlet容器; 序列化:http表单 依懒servlet容器配置 1、数据包大小混合
thrift 与thrift RPC 实现集成,并在其基础上修改了报文头 长连接、NIO异步传输

(PS:本文只探讨dubbo协议)

二 协议的基本组成

在这里插入图片描述

  1. IP:服务提供者的地址
  2. 端口:协议指定开放端口
  3. 运行服务
    (1)netty
    (2)mima
    (3)rmi
    (4)servlet容器(Jetty、Tomcat、Jboss)
  4. 协议报文编码
  5. 序列化方式
    (1)Hessian2Serialization
    (2)DubboSerialization
    (3)JavaSerialization
    (4)JsonSerialization

三 Duboo的RPC协议报文

先看下http协议报文格式
在这里插入图片描述
在这里插入图片描述
同样,Dubbo也有自己的报文格式
在这里插入图片描述
以head+request body或head+response body的形式存在

  • head
    1标志位:表明是请求还是响应还是事件
    2status:表明状态是OK还是不OK
  • request body
    1Dubbo版本号
    2接口路径
    3接口版本
    4方法名称
    5参数类型
    6参数值
  • response body
    1结果标志(无结果、有结果、异常)
    2结果

协议的编解码过程:
在这里插入图片描述

四 源码探究

以明晰编码解码和序列化反序列化为目的探究源码。其实就是如上图所示的协议的编解码过程。

com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec是很重要的一个类,无论是request还是response,还有编码解码都在这里类进行调度。

DubboCodec:
在这里插入图片描述
其中重点关注三个方法
decodeBody():解码(请求或响应)以及序列化和反序列化
encodeRequestData():编码请求(发生在Consumer)
encodeResponseData():编码响应(发生在Provider)

1.编码序列化request

发生在Consumer发请求之前
encodeRequestData()

protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
        RpcInvocation inv = (RpcInvocation)data;
        out.writeUTF(inv.getAttachment("dubbo", DUBBO_VERSION));
        out.writeUTF(inv.getAttachment("path"));
        out.writeUTF(inv.getAttachment("version"));
        out.writeUTF(inv.getMethodName());
        out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
        Object[] args = inv.getArguments();
        if (args != null) {
            for(int i = 0; i < args.length; ++i) {
                out.writeObject(CallbackServiceCodec.encodeInvocationArgument(channel, inv, i));
            }
        }

        out.writeObject(inv.getAttachments());
    }

参数ObjectOutput是序列化接口,具体调用什么实现类有配置决定,如没有则默认是hessian2。能用的子类(序列化方式)如下
在这里插入图片描述

RpcInvocation拿到datadata是请求的基本内容,也就是第三部分所说的request body的六个模块:Dubbo版本号、接口路径、接口版本、方法名称、参数类型、参数值。
writeUTF()将版本号、接口路径、接口版本、方法名和参数称写进序列化类。
最后的writeObject() 通过配置的序列化方式调用相应的实现类进行序列化,如在protocol配置了serialization=“fastjson”,将调用FastJsonObjectOutput实现类的writeObject()
在这里插入图片描述
编码序列化request完成

2.编码序列化response

发生在Provider发出响应之前。
encodeResponseData

protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException {
        Result result = (Result)data;
        Throwable th = result.getException();
        if (th == null) {
            Object ret = result.getValue();
            if (ret == null) {
                out.writeByte((byte)2);
            } else {
                out.writeByte((byte)1);
                out.writeObject(ret);
            }
        } else {
            out.writeByte((byte)0);
            out.writeObject(th);
        }

    }

过程与编码序列化request类似且较为简单,不再多说。

3.解码反序列化request和response

解码反序列化request发生在Provider;解码反序列化response发生在Consumer。两个方法在同个方法中,就一起讲了。

protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
       byte flag = header[2];
       byte proto = (byte)(flag & 31);
       Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
       long id = Bytes.bytes2long(header, 4);
       if ((flag & -128) == 0) {
           Response res = new Response(id);
           if ((flag & 32) != 0) {
               res.setEvent(Response.HEARTBEAT_EVENT);
           }

           byte status = header[3];
           res.setStatus(status);
           if (status == 20) {
               try {
                   Object data;
                   if (res.isHeartbeat()) {
                       data = this.decodeHeartbeatData(channel, this.deserialize(s, channel.getUrl(), is));
                   } else if (res.isEvent()) {
                       data = this.decodeEventData(channel, this.deserialize(s, channel.getUrl(), is));
                   } else {
                       DecodeableRpcResult result;
                       if (channel.getUrl().getParameter("decode.in.io", true)) {
                           result = new DecodeableRpcResult(channel, res, is, (Invocation)this.getRequestData(id), proto);
                           result.decode();
                       } else {
                           result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(this.readMessageData(is)), (Invocation)this.getRequestData(id), proto);
                       }

                       data = result;
                   }

                   res.setResult(data);
               } catch (Throwable var13) {
                   if (log.isWarnEnabled()) {
                       log.warn("Decode response failed: " + var13.getMessage(), var13);
                   }

                   res.setStatus((byte)90);
                   res.setErrorMessage(StringUtils.toString(var13));
               }
           } else {
               res.setErrorMessage(this.deserialize(s, channel.getUrl(), is).readUTF());
           }

           return res;
       } else {
           Request req = new Request(id);
           req.setVersion("2.0.0");
           req.setTwoWay((flag & 64) != 0);
           if ((flag & 32) != 0) {
               req.setEvent(Request.HEARTBEAT_EVENT);
           }

           try {
               Object data;
               if (req.isHeartbeat()) {
                   data = this.decodeHeartbeatData(channel, this.deserialize(s, channel.getUrl(), is));
               } else if (req.isEvent()) {
                   data = this.decodeEventData(channel, this.deserialize(s, channel.getUrl(), is));
               } else {
                   DecodeableRpcInvocation inv;
                   if (channel.getUrl().getParameter("decode.in.io", true)) {
                       inv = new DecodeableRpcInvocation(channel, req, is, proto);
                       inv.decode();
                   } else {
                       inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(this.readMessageData(is)), proto);
                   }

                   data = inv;
               }

               req.setData(data);
           } catch (Throwable var14) {
               if (log.isWarnEnabled()) {
                   log.warn("Decode request failed: " + var14.getMessage(), var14);
               }

               req.setBroken(true);
               req.setData(var14);
           }

           return req;
       }
   }

需要注意的是来到这个方法表明请求头已经处理好,现在是处理body。
flag通过header拿到标志位。
第一个if语句(flag & -128) == 0,实际上是在判断是request还是response,若为true为response,也就是Consumer要解码反序列化从Provider发来的响应;若为false为request,也就是Provider要解码反序列化从Consumer发来的请求。

(1)解码反序列化request

(flag & -128) == 0为false时,进入else执行体,在服务端进行操作。
if ((flag & 32) != 0)在判断是否时一个心跳事件,心跳事件时为了检测连接是否断开以备重连。
if (req.isHeartbeat())判断是否时一个心跳事件,else if (req.isEvent())判断是否时一个事件
排除了这两个之后就是真正的request。
inv拿到request相关参数,inv.decode()进行解码和反序列化。
调用DecodeableRpcInvocationdecode()方法如下

public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), this.serializationType).deserialize(channel.getUrl(), input);
        this.setAttachment("dubbo", in.readUTF());
        this.setAttachment("path", in.readUTF());
        this.setAttachment("version", in.readUTF());
        this.setMethodName(in.readUTF());

        try {
            String desc = in.readUTF();
            Object[] args;
            Class[] pts;
            if (desc.length() == 0) {
                pts = DubboCodec.EMPTY_CLASS_ARRAY;
                args = DubboCodec.EMPTY_OBJECT_ARRAY;
            } else {
                pts = ReflectUtils.desc2classArray(desc);
                args = new Object[pts.length];

                for(int i = 0; i < args.length; ++i) {
                    try {
                        args[i] = in.readObject(pts[i]);
                    } catch (Exception var9) {
                        if (log.isWarnEnabled()) {
                            log.warn("Decode argument failed: " + var9.getMessage(), var9);
                        }
                    }
                }
            }

            this.setParameterTypes(pts);
            Map<String, String> map = (Map)in.readObject(Map.class);
            if (map != null && map.size() > 0) {
                Map<String, String> attachment = this.getAttachments();
                if (attachment == null) {
                    attachment = new HashMap();
                }

                ((Map)attachment).putAll(map);
                this.setAttachments((Map)attachment);
            }

            for(int i = 0; i < args.length; ++i) {
                args[i] = CallbackServiceCodec.decodeInvocationArgument(channel, this, pts, i, args[i]);
            }

            this.setArguments(args);
            return this;
        } catch (ClassNotFoundException var10) {
            throw new IOException(StringUtils.toString("Read invocation data failed.", var10));
        }
    }

其中ObjectInput选择的序列化方式实现子类依然时根据配置文件来的,只有与客户端序列化的方式一样才能反序列化成功。接下来是逐个readUTF()解码request body的模块。try代码块里的readUTF()解码出参数类型和参数值。最后将dubbo的隐式参数也一同设置进去Map<String, String> map = (Map)in.readObject(Map.class),到这里DecodeableRpcInvocation拿到所有相关参数,后续可以进行业务操作。
解码反序列化request完成

(2)解码反序列化response

(flag & -128) == 0为true时,进入if执行体,在客户端进行操作。
if ((flag & 32) != 0)在判断是否时一个心跳事件,心跳事件时为了检测连接是否断开以备重连。
status从header拿到状态码,如果不等于20,直接进入else执行错误信息写入到responseres.setErrorMessage()
if (req.isHeartbeat()判断是否时一个心跳事件,else if (req.isEvent()判断是否时一个事件
排除了这两个之后就是真正的response。
result拿到response相关参数,result .decode()进行解码和反序列化。
调用DecodeableRpcResultdecode()方法如下

public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), this.serializationType).deserialize(channel.getUrl(), input);
        byte flag = in.readByte();
        switch(flag) {
        case 0:
            try {
                Object obj = in.readObject();
                if (!(obj instanceof Throwable)) {
                    throw new IOException("Response data error, expect Throwable, but get " + obj);
                }

                this.setException((Throwable)obj);
                break;
            } catch (ClassNotFoundException var6) {
                throw new IOException(StringUtils.toString("Read response data failed.", var6));
            }
        case 1:
            try {
                Type[] returnType = RpcUtils.getReturnTypes(this.invocation);
                this.setValue(returnType != null && returnType.length != 0 ? (returnType.length == 1 ? in.readObject((Class)returnType[0]) : in.readObject((Class)returnType[0], returnType[1])) : in.readObject());
            } catch (ClassNotFoundException var7) {
                throw new IOException(StringUtils.toString("Read response data failed.", var7));
            }
        case 2:
            break;
        default:
            throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
        }

        return this;
    }

一开始就调用getSerialization()进行反序列化,然后赋给ObjectInput。
判断flag,0为发生异常,并处理异常信息;2为没值,直接退出方法。
当等于1时对response进行解码,调用setValue()将信息读出来。
解码反序列化response完成

4.业务调用

在这里插入图片描述
了解是如何编码序列化等操作之后,最后看下服务端接收到请求整个流程是如何调用的。(客户端接收到响应类似)
在这里插入图片描述
以dubbo默认的传输服务netty为例,存在一个重要的类:
com\alibaba\dubbo\remoting\transport\netty\NettyServer.class
(客户端为NettyClient)
在这里插入图片描述
其中的doOpen()方法,表示打开服务

protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        this.bootstrap = new ClientBootstrap(channelFactory);
        this.bootstrap.setOption("keepAlive", true);
        this.bootstrap.setOption("tcpNoDelay", true);
        this.bootstrap.setOption("connectTimeoutMillis", this.getTimeout());
        final NettyHandler nettyHandler = new NettyHandler(this.getUrl(), this);
        this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(NettyClient.this.getCodec(), NettyClient.this.getUrl(), NettyClient.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
    }

三个pipeline.addLast()操作对应解码、编码以及解码后的操作。编解码上面已经说过,这里主要探究解码后的操作。

解码完成后带着参数发起对AllDispatcher类的调用

public class AllDispatcher implements Dispatcher {
    public static final String NAME = "all";

    public AllDispatcher() {
    }

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }
}

可以看到它又调用了ChannelHandler接口来处理,最终是返回调用AllChannelHandler实现类。
在这里插入图片描述
其中在received()方法中进行线程派发

public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = this.getExecutorService();

        try {
            cexecutor.execute(new ChannelEventRunnable(channel, this.handler, ChannelState.RECEIVED, message));
        } catch (Throwable var8) {
            if (message instanceof Request && var8 instanceof RejectedExecutionException) {
                Request request = (Request)message;
                if (request.isTwoWay()) {
                    String msg = "Server side(" + this.url.getIp() + "," + this.url.getPort() + ") threadpool is exhausted ,detail msg:" + var8.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus((byte)100);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }

            throw new ExecutionException(message, channel, this.getClass() + " error when process received event .", var8);
        }
    }

传进来的参数Object message包含request。
ExecutorService cexecutor拿到对应的线程池。
调用cexecutor.execute()执行,执行时调用了ChannelEventRunnable,在ChannelEventRunnable这个类的run()方法就调用了我们自己写的业务方法。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/144671.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • 吉木萨尔县文化旅游策划案——天山圣地,武侠之都!「建议收藏」

    吉木萨尔县文化旅游策划案——天山圣地,武侠之都!「建议收藏」吉木萨尔县文化旅游策划案——天山圣地,武侠之都!熊大寻旅游策划公司/文2011年熊大寻旅游策划公司受邀对新疆吉木萨尔县进行旅游策划和规划,我们的核心策略是:吉木萨尔有空前的历史性机会——天山旅游并没有真正做起来!天山旅游仅限于天池,每年几十万游客跟天山的品牌严重不匹配。天山旅游目前仅限于自然景观,天山厚重的历史文化呢?到哪里领略?到哪里体验?天山历史文化的中心和重心在哪里?吉木萨尔!新疆旅游…

  • web性能优化–用gzip压缩资源文件

    web性能优化–用gzip压缩资源文件一、gzip压缩技术gzip(GNU-ZIP)是一种压缩技术。经过gzip压缩后页面大小可以变为原来的30%甚至更小,这样,用户浏览页面的时候速度会快得多。gzip的压缩页面需要浏览器和服务器双方都支持,实际上就是服务器端压缩,传到浏览器后浏览器解压并解析。浏览器那里不需要我们担心,因为目前的大多数浏览器都支持解析gzip压缩过的资源文件。在实际的应用中我们发现压缩的比率往往在3到10倍,也…

  • 改进神经风格迁移_癌细胞能否沿着神经迁移

    改进神经风格迁移_癌细胞能否沿着神经迁移神经风格迁移使用CNN将一幅图像的艺术风格转移到另一幅图像。但神经风格迁移存在两个缺陷,首先是神经风格迁移基于神经网络训练反向传播,因此速度较慢,同时风格迁移会获取风格图像所有风格信息,包括颜色和笔触等,不能进行更好的控制。因此许多论文和应用针对原始的神经风格迁移的缺点进行了改进。学会神经风格迁移,免费获取价值百元DIY数字油画定制照片。

    2022年10月26日
  • ntp 校时 linux 带源码

    ntp 校时 linux 带源码最近做个项目,想通过公司上的NTP服务器给板子校时,但是板子里没有ntpdate这个命令,下面是2个解决方法,1,找到ntpdate源代码,重新编译之后,手动运行,这个方法我上网上查了,比较复杂,据说NTP还与SSL有关,编译的时候必须把SSL也包含进去,于是就迟迟没有动工。2,突然有一天看到rtthread里也提供一个ntp的客户端,比较简单,就一个文件,也没几行,于是想着把这个.c文件移植到linux下,但我仔细研究了一下,发现这个文件的原始作者就是在linux下设计的。原始文件更简单.

  • LinkedList和ArrayList的区别[通俗易懂]

    LinkedeList和ArrayList都实现了List接口,但是它们的工作原理却不一样。它们之间最主要的区别在于ArrayList是可改变大小的数组,而LinkedList是双向链接串列(doubly LinkedList)。ArrayList更受欢迎,很多场景下ArrayList比LinkedList更为适用。这篇文章中我们将会看看LinkedeList和ArrayList的不同,而且我们试

  • win10显卡驱动怎么装_win10系统显卡驱动安装失败怎么办

    win10显卡驱动怎么装_win10系统显卡驱动安装失败怎么办大家好,今天分享一篇来自小白系统官网(xiaobaixitong.com)的图文教程。我们日常在对电脑的使用过程中,经常都会遇到这样或那样的问题。比如说win10系统显卡驱动安装失败该怎么办呢?别着急,还有小编在呢?接下来小编就来告诉大家win10电脑系统显卡驱动安装失败怎么解决。详细教你win10系统显卡驱动安装失败怎么办:方法一,删除之前的显卡驱动文件重新安装1,首先,右键点击“此电脑”,菜单…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号