大家好,又见面了,我是你们的朋友全栈君。
手写Dubbo框架
一句话认识Dubbo
Dubbo是阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和Spring框架无缝集成。现在已成为Apache的开源项目。
了解Dubbo
详细了解直接进dubbo官网看中文文档:http://dubbo.apache.org/zh-cn/docs/user/preface/architecture.html
一句话明白RPC
RPC是什么?
RPC(Remote Procedure Call)—远程过程调用
,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外的为这个交互作用编程,如果涉及的软件采用面向对象编程(java),那么远程过程调用亦可称作远程调用
或远程方法调用
。只要支持网络传输的协议就是RPC协议,RPC是一种框架。
手写可扩展的RPC协议
缘起
公司的项目也在用Dubbo,近日又看一关于手写dubbo的视频,于是乎想着手敲一下简单的dubbo框架
项目地址
https://github.com/ghostKang/dubbo-study
多模块设计
按照官网架构图,模块内容设计如下
- 服务端:提供API,启动的时候要注册服务
- 消费端:从注册中心获取服务,调用子服务
- 注册中心:保存服务配置
- RPC协议:基于Tomcat的HttpProtocol,基于Netty的DubboProtocol
由于模块之间还要引用jar包,于是在手写实现时以包的形式代表各个模块
服务端
提供API
也就是接口,实现接口
public interface HelloService {
public void sayHello(String username);
}
public class HelloServiceImpl implements HelloService {
public void sayHello(String username) {
System.out.println("Hello:"+username);
}
}
注册服务,启动tomcat
public class Provider {
public static void main(String[] args) {
// 注册服务
URL url = new URL("localhost",8080);
Register.regist(url, HelloService.class.getName(), HelloServiceImpl.class);
// 启动tomcat
HttpServer httpServer = new HttpServer();
httpServer.start(url.getHostname(),url.getPort());
}
}
注册中心实现
服务注册形式
以接口名为key,通过服务调用地址找到具体实现类为。
在消费端,直接传接口名就可以找到具体实现。
Map<interfacename,Map<URL,Class>>
两个数据bean
Invocation .java
要实现Serializable,在服务消费端设值后序列化成对象流传输,然后在服务提供端转为对象,获取接口名,从注册中心获取实现类,从而调用方法。
public class Invocation implements Serializable {
private String interfaceName;
private String methodName;
private Object[] params;
private Class[] paramTypes;
public Invocation(String interfaceName, String methodName, Object[] params, Class[] paramTypes) {
this.interfaceName = interfaceName;
this.methodName = methodName;
this.params = params;
this.paramTypes = paramTypes;
}
public String getInterfaceName() {
return interfaceName;
}
public void setInterfaceName(String interfaceName) {
this.interfaceName = interfaceName;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Object[] getParams() {
return params;
}
public void setParams(Object[] params) {
this.params = params;
}
public Class[] getParamTypes() {
return paramTypes;
}
public void setParamTypes(Class[] paramTypes) {
this.paramTypes = paramTypes;
}
}
URL .java
地址接口类
public class URL {
private String hostname;
private Integer port;
public URL(String hostname,Integer port){
this.hostname = hostname;
this.port = port;
}
public String getHostname() {
return hostname;
}
public void setHostname(String hostname) {
this.hostname = hostname;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
URL url = (URL) o;
if (hostname != null ? !hostname.equals(url.hostname) : url.hostname != null) return false;
return port != null ? port.equals(url.port) : url.port == null;
}
@Override
public int hashCode() {
int result = hostname != null ? hostname.hashCode() : 0;
result = 31 * result + (port != null ? port.hashCode() : 0);
return result;
}
}
具体实现
Register.java
public class Register {
private static Map<String,Map<URL,Class>> REGISTER = new HashMap<String, Map<URL, Class>>();
/**
* 注册服务(暴露接口)
* @param url
* @param interfaceName
* @param implClass
*/
public static void regist(URL url,String interfaceName,Class implClass){
Map<URL,Class> map = new HashMap<URL, Class>();
map.put(url,implClass);
REGISTER.put(interfaceName,map);
}
/**
* 从注册中心获取实现类(发现服务)
* @param url
* @param interfaceName
* @return
*/
public static Class get(URL url,String interfaceName){
return REGISTER.get(interfaceName).get(url);
}
}
HTTP协议
内嵌tomcat启动
引入内嵌tomcat依赖
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<version>9.0.12</version>
</dependency>
tomcat结构
server.xml
<Server port="8005" shutdown="SHUTDOWN">
<Service name="Catalina">
<Connector port="8080" protocol="HTTP/1.1"
connectionTimeout="20000"
redirectPort="8443"
URIEncoding="UTF-8"/>
<Engine name="Catalina" defaultHost="localhost">
<Host name="localhost" appBase="webapps"
unpackWARs="true" autoDeploy="true">
<Context path="" doBase="WORKDIR" reloadable="true"/>
</Host>
</Engine>
</Service>
</Server>
是不是很熟悉,根据这个xml结构构建一个tomcat启动类
具体实现
HttpServer.java
public class HttpServer {
public void start(String hostname,Integer port){
// 实例一个tomcat
Tomcat tomcat = new Tomcat();
// 构建server
Server server = tomcat.getServer();
/**
* 在getServer的时候,就在方法内部执行了
* Service service = new StandardService();
* service.setName("Tomcat");
* server.addService(service);
*/
// 获取service
Service service = server.findService("Tomcat");
// 构建Connector
Connector connector = new Connector();
connector.setPort(port);
connector.setURIEncoding("UTF-8");
// 构建Engine
Engine engine = new StandardEngine();
engine.setDefaultHost(hostname);
// 构建Host
Host host = new StandardHost();
host.setName(hostname);
// 构建Context
String contextPath = "";
Context context = new StandardContext();
context.setPath(contextPath);
context.addLifecycleListener(new Tomcat.FixContextListener());// 生命周期监听器
// 然后按照server.xml,一层层把子节点添加到父节点
host.addChild(context);
engine.addChild(host);
service.setContainer(engine);
service.addConnector(connector);
// service在getServer时就被添加到server节点了
// tomcat是一个servlet,设置路径与映射
tomcat.addServlet(contextPath,"dispatcher",new DispatcherServlet());
context.addServletMappingDecoded("/client/*","dispatcher");
try {
tomcat.start();// 启动tomcat
tomcat.getServer().await();// 接受请求
}catch (LifecycleException e){
e.printStackTrace();
}
}
}
HttpServerHandler.java
所有http请求交给HttpServerHandler处理,即服务消费端的远程调用
public class DispatcherServlet extends HttpServlet{
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
// 方便后期在此拓展服务
new HttpServerHandler().handler(req, resp);
}
}
public class HttpServerHandler {
public void handler(HttpServletRequest req, HttpServletResponse resp){
try{
// Http请求流转为对象
InputStream is = req.getInputStream();
ObjectInputStream ois = new ObjectInputStream(is);
Invocation invocation = (Invocation)ois.readObject();
// 寻找注册中心的实现类,通过反射执行方法
Class implClass = Register.get(new URL("localhost",8080),invocation.getInterfaceName());
Method method = implClass.getMethod(invocation.getMethodName(),invocation.getParamTypes());
String result = (String) method.invoke(implClass.newInstance(),invocation.getParams());
// 将结果返回
IOUtils.write(result,resp.getOutputStream());
}catch (Exception e){
e.printStackTrace();
}
}
}
注意: URL一定要重写equals与hashCode方法,否则Register.get(new URL("localhost",8080),invocation.getInterfaceName());
时为null。
消费端
consumer .java
public class consumer {
public static void main(String[] args) {
// 调用哪个方法
Invocation invocation = new Invocation(
HelloService.class.getName(),
"sayHello",
new Object[]{"yukang"},
new Class[]{String.class});
// 发现服务器
String result = new HttpClient().post("localhost",8080,invocation);
System.out.println(result);
}
}
HttpClient.java
public class HttpClient {
/**
* 远程方法调用
* @param hostname
* @param port
* @param invocation
* @return
*/
public String post(String hostname, Integer port, Invocation invocation){
try {
// 进行http连接
URL url = new URL("http",hostname,port,"/client/");
HttpURLConnection connection = (HttpURLConnection)url.openConnection();
connection.setRequestMethod("POST");
connection.setDoOutput(true);// 必填项
// 将对象写入输出流
OutputStream os = connection.getOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(os);
oos.writeObject(invocation);
oos.flush();
oos.close();
// 将输入流转为字符串(此处可是java对象)
InputStream is = connection.getInputStream();
return IOUtils.toString(is);
}catch (MalformedURLException e){
e.printStackTrace();
}catch (IOException e){
e.printStackTrace();
}
return null;
}
}
测试
先启动服务端
再启动服务端
优化
dubbo是直接引入接口jar包,调用接口方法就可以获取结果,于是使用到了动态代理返回一个代理对象。
动态代理
ProxyFactory.java
public class ProxyFactory<T> {
public static <T> T getProxy(Class interfaceClass){
return (T)Proxy.newProxyInstance(
interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 调用哪个方法
Invocation invocation = new Invocation(
interfaceClass.getName(),
method.getName(),
args,
new Class[]{String.class});
// 模拟负载均衡,随机获取服务器
URL url = Register.random(interfaceClass.getName());
// 调用
HttpClient httpClient = new HttpClient();
return httpClient.post(url.getHostname(),url.getPort(),invocation);
}
});
}
}
以文本形式实现注册中心
因为消费端与服务端是两个进程,消费端是获取不到服务端的REGISTER
的,所以需要在服务端注册时将URL写入文本,因然后在消费端根据interfaceName随机调度已发布服务的服务器地址。
Register.java
public class Register {
private static Map<String,Map<URL,Class>> REGISTER = new HashMap<String, Map<URL, Class>>();
/**
* 注册服务(暴露接口)
* @param url
* @param interfaceName
* @param implClass
*/
public static void regist(URL url,String interfaceName,Class implClass){
Map<URL,Class> map = new HashMap<URL, Class>();
map.put(url,implClass);
REGISTER.put(interfaceName,map);
// 写入文本
saveFile();
}
/**
* 从注册中心获取实现类(发现服务)
* @param url
* @param interfaceName
* @return
*/
public static Class get(URL url,String interfaceName){
return REGISTER.get(interfaceName).get(url);
}
/**
* 模拟负载均衡,随机获取服务器
* @param interfaceName
* @return
*/
public static URL random(String interfaceName){
REGISTER = getFile();
if(REGISTER != null){
return REGISTER.get(interfaceName).keySet().iterator().next();
}
return null;
}
/**
* 写入文本
*/
public static void saveFile(){
try {
FileOutputStream fos = new FileOutputStream("D://register.text");
ObjectOutputStream oos = new ObjectOutputStream(fos);
oos.writeObject(REGISTER);
oos.flush();
oos.close();
}catch (IOException e){
e.printStackTrace();
}
}
/**
* 获取文本
* @return
*/
public static Map<String,Map<URL,Class>> getFile(){
try {
FileInputStream fis = new FileInputStream("D://register.text");
ObjectInputStream ois = new ObjectInputStream(fis);
return (Map<String,Map<URL,Class>>)ois.readObject();
}catch (IOException | ClassNotFoundException e){
e.printStackTrace();
}
return null;
}
public static Class getClass(URL url,String interfaceName){
REGISTER = getFile();
if(REGISTER != null){
return REGISTER.get(interfaceName).get(url);
}
return null;
}
}
优化后的消费端
consumer.java
public class consumer {
public static void main(String[] args) {
// 此处模拟spring容器
HelloService service = ProxyFactory.getProxy(HelloService.class);
String result = service.sayHello("yukang");
System.out.println(result);
}
}
netty实现?
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/128109.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...