《Dubbo进阶一》——RPC协议底层原理

《Dubbo进阶一》——RPC协议底层原理一RPC协议简介在一个典型的RPC的使用场景中,包含了服务发现、负载、容错、序列化和网络传输等组件,其中RPC协议指明了程序如何进行序列化和网络传输,也就是说一个RPC协议的实现等于一个非透明的RPC调用。简单来说,分布式框架的核心是RPC框架,RPC框架的核心是RPC协议。二协议的基本组成IP:服务提供者的地址端口:协议指定开放端口运行服务(1)netty(2)mima…

大家好,又见面了,我是你们的朋友全栈君。

一 RPC协议简介

在一个典型的RPC的使用场景中,包含了服务发现、负载、容错、序列化和网络传输等组件,其中RPC协议指明了程序如何进行序列化和网络传输,也就是说一个RPC协议的实现等于一个非透明的RPC调用。
在这里插入图片描述
简单来说,分布式框架的核心是RPC框架,RPC框架的核心是RPC协议

dubbo 支持的RPC协议列表

名称 实现描述 连接描述 使用场景
dubbo 传输服务: mina, netty(默认), grizzy; 序列化: dubbo, hessian2(默认), java, fastjson。 自定义报文 单个长连接NIO;异步传输 1.常规RPC调用 2.传输数据量小 3.提供者少于消费者
rmi 传输:java rmi 服务; 序列化:java原生二进制序列化 多个短连接; BIO同步传输 1.常规RPC调用 2.与原RMI客户端集成 3.可传少量文件 4.不支持防火墙穿透
hessian 传输服务:servlet容器; 序列化:hessian二进制序列化 基于Http 协议传输,依懒servlet容器配置 1.提供者多于消费者 2.可传大字段和文件 3.跨语言调用
http 传输服务:servlet容器; 序列化:http表单 依懒servlet容器配置 1、数据包大小混合
thrift 与thrift RPC 实现集成,并在其基础上修改了报文头 长连接、NIO异步传输

(PS:本文只探讨dubbo协议)

二 协议的基本组成

在这里插入图片描述

  1. IP:服务提供者的地址
  2. 端口:协议指定开放端口
  3. 运行服务
    (1)netty
    (2)mima
    (3)rmi
    (4)servlet容器(Jetty、Tomcat、Jboss)
  4. 协议报文编码
  5. 序列化方式
    (1)Hessian2Serialization
    (2)DubboSerialization
    (3)JavaSerialization
    (4)JsonSerialization

三 Duboo的RPC协议报文

先看下http协议报文格式
在这里插入图片描述
在这里插入图片描述
同样,Dubbo也有自己的报文格式
在这里插入图片描述
以head+request body或head+response body的形式存在

  • head
    1标志位:表明是请求还是响应还是事件
    2status:表明状态是OK还是不OK
  • request body
    1Dubbo版本号
    2接口路径
    3接口版本
    4方法名称
    5参数类型
    6参数值
  • response body
    1结果标志(无结果、有结果、异常)
    2结果

协议的编解码过程:
在这里插入图片描述

四 源码探究

以明晰编码解码和序列化反序列化为目的探究源码。其实就是如上图所示的协议的编解码过程。

com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec是很重要的一个类,无论是request还是response,还有编码解码都在这里类进行调度。

DubboCodec:
在这里插入图片描述
其中重点关注三个方法
decodeBody():解码(请求或响应)以及序列化和反序列化
encodeRequestData():编码请求(发生在Consumer)
encodeResponseData():编码响应(发生在Provider)

1.编码序列化request

发生在Consumer发请求之前
encodeRequestData()

protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
        RpcInvocation inv = (RpcInvocation)data;
        out.writeUTF(inv.getAttachment("dubbo", DUBBO_VERSION));
        out.writeUTF(inv.getAttachment("path"));
        out.writeUTF(inv.getAttachment("version"));
        out.writeUTF(inv.getMethodName());
        out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
        Object[] args = inv.getArguments();
        if (args != null) {
            for(int i = 0; i < args.length; ++i) {
                out.writeObject(CallbackServiceCodec.encodeInvocationArgument(channel, inv, i));
            }
        }

        out.writeObject(inv.getAttachments());
    }

参数ObjectOutput是序列化接口,具体调用什么实现类有配置决定,如没有则默认是hessian2。能用的子类(序列化方式)如下
在这里插入图片描述

RpcInvocation拿到datadata是请求的基本内容,也就是第三部分所说的request body的六个模块:Dubbo版本号、接口路径、接口版本、方法名称、参数类型、参数值。
writeUTF()将版本号、接口路径、接口版本、方法名和参数称写进序列化类。
最后的writeObject() 通过配置的序列化方式调用相应的实现类进行序列化,如在protocol配置了serialization=“fastjson”,将调用FastJsonObjectOutput实现类的writeObject()
在这里插入图片描述
编码序列化request完成

2.编码序列化response

发生在Provider发出响应之前。
encodeResponseData

protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException {
        Result result = (Result)data;
        Throwable th = result.getException();
        if (th == null) {
            Object ret = result.getValue();
            if (ret == null) {
                out.writeByte((byte)2);
            } else {
                out.writeByte((byte)1);
                out.writeObject(ret);
            }
        } else {
            out.writeByte((byte)0);
            out.writeObject(th);
        }

    }

过程与编码序列化request类似且较为简单,不再多说。

3.解码反序列化request和response

解码反序列化request发生在Provider;解码反序列化response发生在Consumer。两个方法在同个方法中,就一起讲了。

protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
       byte flag = header[2];
       byte proto = (byte)(flag & 31);
       Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
       long id = Bytes.bytes2long(header, 4);
       if ((flag & -128) == 0) {
           Response res = new Response(id);
           if ((flag & 32) != 0) {
               res.setEvent(Response.HEARTBEAT_EVENT);
           }

           byte status = header[3];
           res.setStatus(status);
           if (status == 20) {
               try {
                   Object data;
                   if (res.isHeartbeat()) {
                       data = this.decodeHeartbeatData(channel, this.deserialize(s, channel.getUrl(), is));
                   } else if (res.isEvent()) {
                       data = this.decodeEventData(channel, this.deserialize(s, channel.getUrl(), is));
                   } else {
                       DecodeableRpcResult result;
                       if (channel.getUrl().getParameter("decode.in.io", true)) {
                           result = new DecodeableRpcResult(channel, res, is, (Invocation)this.getRequestData(id), proto);
                           result.decode();
                       } else {
                           result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(this.readMessageData(is)), (Invocation)this.getRequestData(id), proto);
                       }

                       data = result;
                   }

                   res.setResult(data);
               } catch (Throwable var13) {
                   if (log.isWarnEnabled()) {
                       log.warn("Decode response failed: " + var13.getMessage(), var13);
                   }

                   res.setStatus((byte)90);
                   res.setErrorMessage(StringUtils.toString(var13));
               }
           } else {
               res.setErrorMessage(this.deserialize(s, channel.getUrl(), is).readUTF());
           }

           return res;
       } else {
           Request req = new Request(id);
           req.setVersion("2.0.0");
           req.setTwoWay((flag & 64) != 0);
           if ((flag & 32) != 0) {
               req.setEvent(Request.HEARTBEAT_EVENT);
           }

           try {
               Object data;
               if (req.isHeartbeat()) {
                   data = this.decodeHeartbeatData(channel, this.deserialize(s, channel.getUrl(), is));
               } else if (req.isEvent()) {
                   data = this.decodeEventData(channel, this.deserialize(s, channel.getUrl(), is));
               } else {
                   DecodeableRpcInvocation inv;
                   if (channel.getUrl().getParameter("decode.in.io", true)) {
                       inv = new DecodeableRpcInvocation(channel, req, is, proto);
                       inv.decode();
                   } else {
                       inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(this.readMessageData(is)), proto);
                   }

                   data = inv;
               }

               req.setData(data);
           } catch (Throwable var14) {
               if (log.isWarnEnabled()) {
                   log.warn("Decode request failed: " + var14.getMessage(), var14);
               }

               req.setBroken(true);
               req.setData(var14);
           }

           return req;
       }
   }

需要注意的是来到这个方法表明请求头已经处理好,现在是处理body。
flag通过header拿到标志位。
第一个if语句(flag & -128) == 0,实际上是在判断是request还是response,若为true为response,也就是Consumer要解码反序列化从Provider发来的响应;若为false为request,也就是Provider要解码反序列化从Consumer发来的请求。

(1)解码反序列化request

(flag & -128) == 0为false时,进入else执行体,在服务端进行操作。
if ((flag & 32) != 0)在判断是否时一个心跳事件,心跳事件时为了检测连接是否断开以备重连。
if (req.isHeartbeat())判断是否时一个心跳事件,else if (req.isEvent())判断是否时一个事件
排除了这两个之后就是真正的request。
inv拿到request相关参数,inv.decode()进行解码和反序列化。
调用DecodeableRpcInvocationdecode()方法如下

public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), this.serializationType).deserialize(channel.getUrl(), input);
        this.setAttachment("dubbo", in.readUTF());
        this.setAttachment("path", in.readUTF());
        this.setAttachment("version", in.readUTF());
        this.setMethodName(in.readUTF());

        try {
            String desc = in.readUTF();
            Object[] args;
            Class[] pts;
            if (desc.length() == 0) {
                pts = DubboCodec.EMPTY_CLASS_ARRAY;
                args = DubboCodec.EMPTY_OBJECT_ARRAY;
            } else {
                pts = ReflectUtils.desc2classArray(desc);
                args = new Object[pts.length];

                for(int i = 0; i < args.length; ++i) {
                    try {
                        args[i] = in.readObject(pts[i]);
                    } catch (Exception var9) {
                        if (log.isWarnEnabled()) {
                            log.warn("Decode argument failed: " + var9.getMessage(), var9);
                        }
                    }
                }
            }

            this.setParameterTypes(pts);
            Map<String, String> map = (Map)in.readObject(Map.class);
            if (map != null && map.size() > 0) {
                Map<String, String> attachment = this.getAttachments();
                if (attachment == null) {
                    attachment = new HashMap();
                }

                ((Map)attachment).putAll(map);
                this.setAttachments((Map)attachment);
            }

            for(int i = 0; i < args.length; ++i) {
                args[i] = CallbackServiceCodec.decodeInvocationArgument(channel, this, pts, i, args[i]);
            }

            this.setArguments(args);
            return this;
        } catch (ClassNotFoundException var10) {
            throw new IOException(StringUtils.toString("Read invocation data failed.", var10));
        }
    }

其中ObjectInput选择的序列化方式实现子类依然时根据配置文件来的,只有与客户端序列化的方式一样才能反序列化成功。接下来是逐个readUTF()解码request body的模块。try代码块里的readUTF()解码出参数类型和参数值。最后将dubbo的隐式参数也一同设置进去Map<String, String> map = (Map)in.readObject(Map.class),到这里DecodeableRpcInvocation拿到所有相关参数,后续可以进行业务操作。
解码反序列化request完成

(2)解码反序列化response

(flag & -128) == 0为true时,进入if执行体,在客户端进行操作。
if ((flag & 32) != 0)在判断是否时一个心跳事件,心跳事件时为了检测连接是否断开以备重连。
status从header拿到状态码,如果不等于20,直接进入else执行错误信息写入到responseres.setErrorMessage()
if (req.isHeartbeat()判断是否时一个心跳事件,else if (req.isEvent()判断是否时一个事件
排除了这两个之后就是真正的response。
result拿到response相关参数,result .decode()进行解码和反序列化。
调用DecodeableRpcResultdecode()方法如下

public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), this.serializationType).deserialize(channel.getUrl(), input);
        byte flag = in.readByte();
        switch(flag) {
        case 0:
            try {
                Object obj = in.readObject();
                if (!(obj instanceof Throwable)) {
                    throw new IOException("Response data error, expect Throwable, but get " + obj);
                }

                this.setException((Throwable)obj);
                break;
            } catch (ClassNotFoundException var6) {
                throw new IOException(StringUtils.toString("Read response data failed.", var6));
            }
        case 1:
            try {
                Type[] returnType = RpcUtils.getReturnTypes(this.invocation);
                this.setValue(returnType != null && returnType.length != 0 ? (returnType.length == 1 ? in.readObject((Class)returnType[0]) : in.readObject((Class)returnType[0], returnType[1])) : in.readObject());
            } catch (ClassNotFoundException var7) {
                throw new IOException(StringUtils.toString("Read response data failed.", var7));
            }
        case 2:
            break;
        default:
            throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
        }

        return this;
    }

一开始就调用getSerialization()进行反序列化,然后赋给ObjectInput。
判断flag,0为发生异常,并处理异常信息;2为没值,直接退出方法。
当等于1时对response进行解码,调用setValue()将信息读出来。
解码反序列化response完成

4.业务调用

在这里插入图片描述
了解是如何编码序列化等操作之后,最后看下服务端接收到请求整个流程是如何调用的。(客户端接收到响应类似)
在这里插入图片描述
以dubbo默认的传输服务netty为例,存在一个重要的类:
com\alibaba\dubbo\remoting\transport\netty\NettyServer.class
(客户端为NettyClient)
在这里插入图片描述
其中的doOpen()方法,表示打开服务

protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        this.bootstrap = new ClientBootstrap(channelFactory);
        this.bootstrap.setOption("keepAlive", true);
        this.bootstrap.setOption("tcpNoDelay", true);
        this.bootstrap.setOption("connectTimeoutMillis", this.getTimeout());
        final NettyHandler nettyHandler = new NettyHandler(this.getUrl(), this);
        this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(NettyClient.this.getCodec(), NettyClient.this.getUrl(), NettyClient.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
    }

三个pipeline.addLast()操作对应解码、编码以及解码后的操作。编解码上面已经说过,这里主要探究解码后的操作。

解码完成后带着参数发起对AllDispatcher类的调用

public class AllDispatcher implements Dispatcher {
    public static final String NAME = "all";

    public AllDispatcher() {
    }

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }
}

可以看到它又调用了ChannelHandler接口来处理,最终是返回调用AllChannelHandler实现类。
在这里插入图片描述
其中在received()方法中进行线程派发

public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = this.getExecutorService();

        try {
            cexecutor.execute(new ChannelEventRunnable(channel, this.handler, ChannelState.RECEIVED, message));
        } catch (Throwable var8) {
            if (message instanceof Request && var8 instanceof RejectedExecutionException) {
                Request request = (Request)message;
                if (request.isTwoWay()) {
                    String msg = "Server side(" + this.url.getIp() + "," + this.url.getPort() + ") threadpool is exhausted ,detail msg:" + var8.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus((byte)100);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }

            throw new ExecutionException(message, channel, this.getClass() + " error when process received event .", var8);
        }
    }

传进来的参数Object message包含request。
ExecutorService cexecutor拿到对应的线程池。
调用cexecutor.execute()执行,执行时调用了ChannelEventRunnable,在ChannelEventRunnable这个类的run()方法就调用了我们自己写的业务方法。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/144671.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • Photoshop快捷键大全_alt快捷键大全常用

    Photoshop快捷键大全_alt快捷键大全常用察看图像类别  说明:: —Shift键  :—空格键       *—在Imageready中不适用  §—只在Imageready中可用动作结果双击工具箱::或Ctrl+0 使图像最大限度在当前窗口中完整显示双击工具箱::或Alt+Ctr

  • MATLAB中LSTM算法实例_bresenham直线算法

    MATLAB中LSTM算法实例_bresenham直线算法Gauss-Newton算法MATLAB实现结果回顾算法实现总结结果回顾Gauss-Newton算法对Gauss-newton算法做了详细的解释,并且使用C++做了实例程序。但是程序其实有微小错误,实际的坐标并不是年代1815—1885,而是1—8,否则p=A∗exp(B∗t)p=A*exp(B*t)p=A∗exp(B∗t)拟合时将会迅速增大,也得不到A=0.7A=0.7A=0.7…

  • 置顶文章-波波烤鸭博客文章汇总篇【Java核心,经典开源框架应用及源码分析,企业级解决方案等】强烈建议收藏!!![通俗易懂]

    置顶文章-波波烤鸭博客文章汇总篇【Java核心,经典开源框架应用及源码分析,企业级解决方案等】强烈建议收藏!!![通俗易懂]  因为博客中的文章已经越来越来了,为了便于文章检索,特整理本文,欢迎收藏!!!Java核心1.JDK8新特性Lambda表达式讲解接口新特性函数式接口方法引用Stream流Optional工具类介绍新的日期时间工具类介绍注解的增强2.Java核心Java集合核心内容之数组和链表Java集合核心内容之二叉树2-3-4树详解红黑树详解精讲红黑树删除操作剖析反射的本质3.设计模式3.1创建型模式  都是用来帮助我们创建对象的!模式地址单例模式ht

  • 2022.01.4 idea激活码【2022.01最新】2022.02.04

    (2022.01.4 idea激活码)本文适用于JetBrains家族所有ide,包括IntelliJidea,phpstorm,webstorm,pycharm,datagrip等。IntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,下面是详细链接哦~https://javaforall.cn/100143.html…

  • 字典树的数据结构_数据结构快速排序

    字典树的数据结构_数据结构快速排序本文主要包括以下内容:Trie字典树的基本概念Trie字典树的基本操作插入查找前缀查询删除基于链表的Trie字典树Set性能对比LeetCode相关线段树的问题LeetCode第208号问题LeetCode第211号问题LeetCode第677号问题Trie字典树的基本概念上一篇我们介绍了线段树(SegmentTree),本文主要介绍Trie字典树…

  • 慧荣SM2246主控固态硬盘开卡一直pretest解决方法

    慧荣SM2246主控固态硬盘开卡一直pretest解决方法DIY做了一根慧荣SM2246主控的固态硬盘,贴了两个闪存颗粒是TH58TFG9DDLBA8C,但开卡的时候,从量产部落下载的量产软件只能短接进rommode才能识别,而且开卡一直卡在pretest进度就不走了,这种问题是怎么回事。解决方法是:SM2246的板子,需要把闪存贴到主控测,而上面的问题是因为贴到了反面,所以就算跳线了也不能开卡成功。那么我们只需要把闪存贴到主控测,重新设置跳线,再量产就能开卡成功了,也不会卡到pretest了。…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号