「从零单排canal 05」 server模块源码解析

「从零单排canal 05」 server模块源码解析

基于1.1.5-alpha版本,具体源码笔记可以参考我的github:https://github.com/saigu/JavaKnowledgeGraph/tree/master/code_reading/canal

本文将对canal的server模块进行分析,跟之前一样,我们带着几个问题来看源码:

  • CanalServer有几种使用方式?
  • 控制台Admin、客户端client是如何与CanalServer交互的?
  • CanalServerWithNetty和CanalServerWithEmbedded究竟有什么关系?
  • Canal事件消费的特色协议,异步流式api(get/ack/rollback协议)的设计是如何实现的?

server模块内的结构如下:

「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

主要分为了三个包:

  • admin包:

这个包的CanalAdmin接口定义了canalServer上暴露给canal-admin控制台使用的一些服务接口。

上一篇deployer模块解析中提到的CanalAdminController就是实现了CanalAdmin接口(把这个接口的实现放在deployer模块是挺奇怪的)。 Admin包中使用了netty作为服务端(CanalAdminWithNetty类中实现),接受控制台Admin的请求,返回当前canalServer的一些运行状态。

  • server包:

server模块的核心包,本文重点解析的部分,需要了解CanalServerWithEmbedded 和CanalServerWithNetty。

  • spi包:

定义了canalServer的监控内容 通过spi实现,比如项目中的Prometheus子模块实现了监控能力,我们不展开分析。

1.从CanalServer的架构说起

CanalServer目前支持两种模式:

  • serverMode = tcp的Server-Client模式
  • serverMode = kafak 或 rocketMQ 的 Server-MQ-Client模式

为了大家能充分理解canalServer的结构,这里精心制作了一个canalServer的架构图(如果觉得这图不错,给本文点个赞吧)。

1.1 Server-Client模式

架构如图所示:

「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

我们可以清楚的看到Server模块中各个模块的关系与能力:

  • CanalServerWithEmbedde维护了具体的instance任务,负责对binlog进行订阅、过滤、缓存,就是之前的文章介绍过的parser-sink-store的方式。
  • CanalServerWithNetty作为服务端,接收CanalClient的请求,将binlog的消息发送给client。
  • CanalAdminWithNetty作为admin的服务器,接收控制台Admin的控制操作、查询状态操作等,启停或显示当前CanalServer以及instance的状态。

1.2 Server-MQ-Client模式

架构如图所示:

「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

主体部分与Server-client模式一致,主要区别如下:

  • 不需要CanalServerWithNetty,改为CanalMQProducer投递消息给消息队列
  • 不使用CanalClient,改为MqClient获取消息队列的消息进行消费

这种模式相比于Server-client模式

  • 下游解耦,利用消息队列的特性,可以支持多个客户端广播消费、集群消费、重复消费等
  • 会增加系统的复杂度,增加一些延迟

具体模式的选择,需要根据具体的使用场景来决定。

2.server包

admin包和spi包都不属于核心逻辑,因此我们重点关注server包的代码。

「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

我们看到,server包下面分为了embedded包、exception包、netty包和几个接口类。

其中,最顶层的设计就要从CanalServer接口入手。

「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

它的实现类有两个,CanalServerWithEmbedded 和 CanalServerWithNetty。

它们之间的区别官方文档给了一些说明。

「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

那么,对于官方文档中提到的Embedded(嵌入式)的自主开发是怎么使用呢?

跟我们上面提到的Server-Client模式和Server-MQ-Client模式完全不同,采用了一种无server的架构,如下图所示。

「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

我们可以看到,这种模式没有了Canal-Server,直接在自己的应用中引入canal,然后使用CanalServerWithEmbedded进行数据抓取和订阅。

当然,这种方式开发成本有点高,一般也不会去这样使用。

对于CanalServerWithEmbedded 和 CanalServerWithNetty,官方文档里面实际上没有解释的特别到位,只讲了区别,没有讲联系。

这两个实现类除了官方文档中说明的区别之外,还有很大的联系。

可以看看我们上文介绍的架构图,对于Server-Client模式下的模块联系

「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

实际上,真正的执行逻辑是在CanalServerWithEmbedded中的,CanalServerWithNetty中持有了CanalServerWithEmbedded对象,委托embedded进行相关逻辑处理,CanalServerWithNetty更多的作用是充当服务端与CanalClient进行交互。

3. CanalServerWithNetty类

下面,我们先看看CanalServerWithNetty类。

3.1 单例构建

使用 private构造器 + 静态内部类 来实现一个单例模式,保证了一个CanalServer内部只有一个CanalServerWithNetty。

同时,我们能看到内部持有一个CanalServerWithEmbedded对象,用来处理相关请求,验证了我们上面的说明。

「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

3.2 启动逻辑 start()

源码如下:

「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

主要流程如下:

  • 启动embeddedServer
  • 创建bootstrap实例,设置netty相关配置

参数NioServerSocketChannelFactory也是Netty的API,接受2个线程池参数,第一个线程池是Accept线程池,第二个线程池是woker线程池,Accept线程池接收到client连接请求后,会将代表client的对象转发给worker线程池处理。这里属于netty的知识,不熟悉的可以暂时不必深究,简单认为netty使用线程来处理客户端的高并发请求即可。

  • 构造对应的pipeline,包括解码处理、身份验证、创建netty的 seesionHandler(真正处理客户端请求,seesionHandler的实现是核心逻辑)

pipeline实际上就是netty对客户端请求的处理器链,可以类比JAVA EE编程中Filter的责任链模式,上一个filter处理完成之后交给下一个filter处理,只不过在netty中,不再是filter,而是ChannelHandler。

  • 启动netty,监听port端口,然后客户端对 这个端口的请求可以被接收到

对于 netty的相关知识 ,本文 不深入展开,简单理解 为一个高性能服务器即可,可以监听 端口请求,并 进行相应的处理。

重点在于sessionHandler的处理。

3.3 逻辑分发SessionHandler类

canalServer的处理逻辑显然都在sessionHandler里面,而这个handler在构建时,传入了embeddedServer。

前面我们提过,serverWithNetty的处理逻辑是委派给embeddedServer的,所以这里就非常顺理成章了,让handler维护embeddedServer实例,进行逻辑处理。

sessionHandler继承了netty的SimpleChannelHandler类,重写了messageReceived方法,接收到不同请求后,委托embeddedServer用不同方法进行处理 。

这个方法里面的代码非常冗长,而本质都是委托给embeddedServer去处理,因此,我们看下主干逻辑即可。

「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

可以看到,根据不同的packet类型,最终都是委托给embeddedServer进行处理,这里只是做一个逻辑的判断和分发。

3.4 CanalServerWithNetty小结

到此,我们已经了解了CanalServerWithNetty是如何启动的。

并且,它的主要定位就是充当服务器,接收客户端的请求,然后做消息分发,委托给CanalServerEmbedded进行处理。

下面,我们来看下CanalServerEmbedded的相关实现。

4. CanalServerEmbedded类

4.1 基本认识

  • 非完全单例模式,这里使用public的构造器,用户还是有机会自己new对象出来的,应用是用来独立引入进行开发的时候使用。
  • 维护了instance的对象容器
  • 继承了CanalServer和CanalService接口
「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

CannalServer接口其实就是就是start()和stop()方法,没有特别的地方,主要是start()配置了一个MigrateMap.makeComputingMap,

当需要某个instance的时候,就会调用apply方法用instanceGenerator创建对应的instance。

「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

我们重点看下CanalService接口定义的方法。

「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

每个方法的入参都带来clientIdentity,这个是客户端的身份标示

「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

目前canal只支持一个客户端对一个instance进行订阅,clientId全部写死为1001,据说以后可能会支持多用户订阅。

了解CanalService定义的方法在CanalServerEmbedded中如何实现,基本也就能看清CanalServerEmbedded的全貌了。

尤其是,你能理解官网wiki中介绍的canal核心功能——异步消费流式api(get/ack/rollback协议) 设计。

「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

4.2 subscribe方法

主要步骤:

  • 根据客户端标识clientIdentity中的destination,找到对应的instance
  • 通过instance的metaManager记录下当前这个客户端在订阅
  • 通过instace的metaManage获取当前订阅binlog的position位置。如果是第一次订阅,那么metaManage没有position信息,就从eventStore获取第一个binlog的position,然后更新到metaManager
  • 通知下订阅关系变化
「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

这里需要注意一下metaManager,这是一个接口,有多种实现方式,包括基于内存、基于文件、基于内存+zookeeper混合、基于zookeeper等,都在meta模块中,这里就简单了解下概念即可。

  • MemoryMetaManager:位点信息保存在内存中
  • ZookeeperMetaManage:位点信息保存在zk上
  • PeriodMixedMetaManager:前面两种的混合,保存在内存中,然后位点信息定期刷新到zk上
「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

我们在集群模式下,default-instance.xml使用的是基于PeriodMixedMetaManager的实现。

4.3 unsubscribe方法

这个方法比较简单,就不放源码了。

就是找到instance对应的metaManager,然后调用unsubscribe方法取消这个客户端的订阅。

需要注意的是,取消订阅,instance本身仍然是在运行的,可以有新的client来订阅这个instance。

4.4 getWithoutAck方法

先解释几个概念。

我们用的集群版canalServer,默认是使用PeriodMixedMetaManager来管理位点信息,也就是MemoryMetaManager + zookeeperMetaManager。

其中,对于客户端消费instance消息的情况,内部维护了一个对象MemoryClientIdentityBatch进行记录

「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

回到这个方法来说,这个方法用于客户端获取binlog消息,大致流程如下:

  • 根据clientIdentity的destination获取对应的instance
  • 获取到流式数据中的最后一批获取的位置positionRanges(跟batchId有关联,就是上面那个map里面的)
  • 从cananlEventStore里面获取binlog,转化为event。一般是从最后的一个batchId位置开始,如果之前没有batchId,那么就从cursor记录的消费位点开始;如果cursor为空,那只能从eventStore的第一条消息开始。
  • event转化为entry,并生成新的batchId,组合成message返回给客户端

注意在eventStore获取event的时候,用户可以自己设置batchSize和超时时间timeout。为了尽量提高效率,一般一次获取一批binlog,而不是获取一条。这个批次的大小(batchSize)由客户端指定。同时客户端可以指定超时时间,在超时时间内,如果获取到了batchSize的binlog,会立即返回。 如果超时了还没有获取到batchSize指定的binlog个数,也会立即返回。特别的,如果没有设置超时时间,如果没有获取到binlog也立即返回。具体eventStore的获取逻辑,我们下次讲到这个模块再展开。

「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

4.5 get方法

这个方法主要是用于客户端获取binlog消息,与getWithoutAck基本一致。

主要区别在于,客户端获取batch后,自动ack,这样相对来说肯定更快,但是无法保证可靠性。

在项目中看起来暂时没有使用,我们就不展开了。

4.6 ack方法

进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。

  • 从metaManager中移除batchId对应的记录
  • 记录已经成功消费到的binlog位置,以便下一次获取的时候可以从这个位置开始
  • 已经ack的数据,在eventStore中清除
「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

4.7 rollback

rollback有两个方法,回滚所有和回滚指定batchId,不过从源码来看,目前回滚指定指定batchId也是回滚所有。

回滚的本质,就是把所有还没ack的batchId都清空,流式api被get但是还没ack的消息会被重新get。

5.canalMQStarter

在第一节的架构模式中我们分析过了,在启动过程中,如果serverMode选择tcp,会启动canalServerWithNetty,如果serverMode选择了mq,就会启动cannalMQStarter。

所以从模块组成来说,canalMQStarter跟canalServerWithNetty是比较相似的。

canalMQStarter也是委托embeddedCanal做处理,同时委托CanalMQProducer把消息投递到mq集群。

canalServerWithNetty也是委托embeddedCanal做处理,然后通过netty来跟canal-client做交互。

如果我们以后应用中要内嵌embeddedCanal,完全可以参照canalMQStarter和canalServerWithNetty的模式来写。

主要组成如下:

  • 工作线程池executorService,对每个instance起一个worker线程
  • canalMQWorks,记录了destination(instance的标识)和worker线程的关系
  • CanalServerWithEmbedded
  • CanalMQProducer投递mq消息
「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

5.1 start方法

这个方法就是前面canalStarter类里面的start()方法中,对CanalMQStarter.start()的调用。

具体做了三件事情:

  • 获取CanalServerWithEmbedded的单例对象
  • 对应每个instance启动一个worker线程CanalMQRunnable
  • 注册ShutdownHook,退出时关闭线程池和mqProducer
「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

这里主要看看CanalMQRunnable做了些什么。

5.2 CanalMQRunnable

这是一个内部类,就是看看worker里面做了什么

「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

只有一个worker方法,主要逻辑非常清晰:

  • 给自己创建一个身份标识,作为client
  • 根据destination获取对应instance,如果没有就sleep,等待产生(比如从别的server那边HA过来一个instance)
  • 构建一个MQ的destination对象,加载相关mq的配置信息,用作mqProducer的入参
  • 在embeddedCanal中注册这个订阅客户端
  • 开始运行,并通过embededCanal进行流式get/ack/rollback协议,进行数据消费
「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

「从零单排canal 05」 server模块源码解析(1.1.5-sp版本)

 

6.总结

回到开头的几个问题,相信文中都已经做了解答。

  • CanalServer有几种使用方式?

可以独立部署(推荐),可以使用Server-Client模式 和 Server-MQ-Client模式两种。

可以内嵌部署开发(embedded,难度较高)。

  • 控制台Admin、客户端client是如何与CanalServer交互的?

控制台Admin通过CanalAdminWithNetty与服务端交互 客户端client通过CanalServerWithNetty与服务端交互。

  • CanalServerWithNetty和CanalServerWithEmbedded究竟有什么关系?

CanalServerWithEmbedded是真正核心逻辑(parser-sink-store)处理的地方 。CanalServerWithNetty持有CanalServerWithEmbedded对象,接收client的请求然后转发给CanalServerWithEmbedded对象处理。

  • Canal事件消费的特色协议,异步流式api(get/ack/rollback协议)的设计是如何实现的?

CanalServerWithEmbedded集成了CanalService接口,实现了具体的get/ack/rollback协议

 

都看到最后了,原创不易,点个关注,点个赞吧~

文章持续更新,可以微信搜索「阿丸笔记 」第一时间阅读,回复关键字【学习】有我准备的一线大厂面试资料。

知识碎片重新梳理,构建Java知识图谱:
github.com/saigu/JavaK…(历史文章查阅非常方便)

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/2612.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • System.setProperty(),System.setProperty

    System.setProperty(),System.setPropertySystem.getProperties()获得所有的系统变量System.getProperty(“os.name”)获取指定的系统变量(获取系统=windows8.1)System.setProperty(“koow”,”123″)储存在系统变量中,变量名为koow,值为123System.getProperty(“koow”)获取指定的系统变量(获取koow=12

  • javascript typescript_typescript python

    javascript typescript_typescript python前言:无论在学习什么语言的时候,我们都需要明白其该怎么样去定义一个变量或者Function,那么今天我们来看看TypeScript的数据类型。文章目录:一.Ts与Js的区别二.Ts的数据![在这里插入图片描述](https://img-blog.csdnimg.cn/8904446afa764db282b731721429ebda.png)三.数据类型应用1.any类型:2.number类型:3.string类型:4.Array类型:(1).[]:(2).数组泛型:一.Ts与Js的区别众所周知:Jav

  • laravel 循环中子元素使用&符号嵌入到父级,经典版

    laravel 循环中子元素使用&符号嵌入到父级,经典版

  • 有讨厌jasper的吗(can not find the tag library)

    我的解决方法是创建一个myPackage的包,把Person类放入然后<%@pageimport=“myPackage.Person”%>就行了,好像JSP就是只能导在包中的类。。。

  • 数据结构 哈希表设计

    实验6哈希表设计一、实验目的熟练掌握哈希表的构造方法,深刻理解哈希表与其他结构表的实质性差别。 二、实验内容程序的功能是对一批关键字集合采用除留余数法和线性探测再散列的方法解决冲突来建立相应的哈希表和完成查找过程及平均查找长度的计算。【问题描述】    研究哈希(HAXI)表查找技术的两个重要问题是:构造HAXI函数和处理冲突。现在要求针对某个数据集合中的关键字设

  • uwsgi停止[通俗易懂]

    uwsgi停止[通俗易懂]uwsgi停止和nginx配置uwsgi停止nginx配置uwsgi停止必须在uwsgi.ini中配置好pidfile=绝对路径///uwsgi.pid这样才有pid文件uwsgi–stopuwsgi.pid这样就可以停止了有一点需要注意:如果没有uwsgi.pid又需要停止uwsgi服务可以netstat…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号