「从零单排canal 04」 启动模块deployer源码解析

「从零单排canal 04」 启动模块deployer源码解析

基于1.1.5-alpha版本,具体源码笔记可以参考我的github:https://github.com/saigu/JavaKnowledgeGraph/tree/master/code_reading/canal

本文将对canal的启动模块deployer进行分析。

Deployer模块(绿色部分)在整个系统中的角色如下图所示,用来启动canal-server.

「从零单排canal 04」 启动模块deployer源码解析

 

模块内的类如下:

「从零单排canal 04」 启动模块deployer源码解析

 

为了能带着目的看源码,以几个问题开头,带着问题来一起探索deployer模块的源码。

  • CanalServer启动过程中配置如何加载?
  • CanalServer启动过程中涉及哪些组件?
  • 集群模式的canalServer,是如何实现instance的HA呢?
  • 每个canalServer又是怎么获取admin上的配置变更呢?

1.入口类CanalLauncher


这个类是整个canal-server的入口类。负责配置加载和启动canal-server。

主流程如下:

  • 加载canal.properties的配置内容
  • 根据canal.admin.manager是否为空判断是否是admin控制,如果不是admin控制,就直接根据canal.properties的配置来了
  • 如果是admin控制,使用PlainCanalConfigClient获取远程配置 新开一个线程池每隔五秒用http请求去admin上拉配置进行merge(这里依赖了instance模块的相关配置拉取的工具方法) 用md5进行校验,如果canal-server配置有更新,那么就重启canal-server
  • 核心是用canalStarter.start()启动
  • 使用CountDownLatch保持主线程存活
  • 收到关闭信号,CDL-1,然后关闭配置更新线程池,优雅退出
  1 public static void main(String[] args) {
  2 
  3     try {
  4 
  5         //note:设置全局未捕获异常的处理
  6 
  7         setGlobalUncaughtExceptionHandler();
  8 
  9         /**
 10 
 11          * note:
 12 
 13          * 1.读取canal.properties的配置
 14 
 15          * 可以手动指定配置路径名称
 16 
 17          */
 18 
 19         String conf = System.getProperty("canal.conf", "classpath:canal.properties");
 20 
 21         Properties properties = new Properties();
 22 
 23         if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
 24 
 25             conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
 26 
 27             properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
 28 
 29         } else {
 30 
 31             properties.load(new FileInputStream(conf));
 32 
 33         }
 34 
 35         final CanalStarter canalStater = new CanalStarter(properties);
 36 
 37         String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
 38 
 39         /**
 40 
 41          * note:
 42 
 43          * 2.根据canal.admin.manager是否为空判断是否是admin控制,如果不是admin控制,就直接根据canal.properties的配置来了
 44 
 45          */
 46 
 47         if (StringUtils.isNotEmpty(managerAddress)) {
 48 
 49             String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
 50 
 51             //省略一部分。。。。。。
 52           
 53 
 54             /**
 55 
 56              * note:
 57 
 58              * 2.1使用PlainCanalConfigClient获取远程配置
 59 
 60              */
 61 
 62             final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,
 63 
 64                     user,
 65 
 66                     passwd,
 67 
 68                     registerIp,
 69 
 70                     Integer.parseInt(adminPort),
 71 
 72                     autoRegister,
 73 
 74                     autoCluster);
 75 
 76             PlainCanal canalConfig = configClient.findServer(null);
 77 
 78             if (canalConfig == null) {
 79 
 80                 throw new IllegalArgumentException("managerAddress:" + managerAddress
 81 
 82                         + " can't not found config for [" + registerIp + ":" + adminPort
 83 
 84                         + "]");
 85 
 86             }
 87 
 88             Properties managerProperties = canalConfig.getProperties();
 89 
 90             // merge local
 91 
 92             managerProperties.putAll(properties);
 93 
 94             int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,
 95 
 96                     CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
 97 
 98                     "5"));
 99 
100             /**
101 
102              * note:
103 
104              * 2.2 新开一个线程池每隔五秒用http请求去admin上拉配置进行merge(这里依赖了instance模块的相关配置拉取的工具方法)
105 
106              */
107 
108             executor.scheduleWithFixedDelay(new Runnable() {
109 
110                 private PlainCanal lastCanalConfig;
111 
112                 public void run() {
113 
114                     try {
115 
116                         if (lastCanalConfig == null) {
117 
118                             lastCanalConfig = configClient.findServer(null);
119 
120                         } else {
121 
122                             PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());
123 
124                             /**
125 
126                              * note:
127 
128                              * 2.3 用md5进行校验,如果canal-server配置有更新,那么就重启canal-server
129 
130                              */
131 
132                             if (newCanalConfig != null) {
133 
134                                 // 远程配置canal.properties修改重新加载整个应用
135 
136                                 canalStater.stop();
137 
138                                 Properties managerProperties = newCanalConfig.getProperties();
139 
140                                 // merge local
141 
142                                 managerProperties.putAll(properties);
143 
144                                 canalStater.setProperties(managerProperties);
145 
146                                 canalStater.start();
147 
148                                 lastCanalConfig = newCanalConfig;
149 
150                             }
151 
152                         }
153 
154                     } catch (Throwable e) {
155 
156                         logger.error("scan failed", e);
157 
158                     }
159 
160                 }
161 
162             }, 0, scanIntervalInSecond, TimeUnit.SECONDS);
163 
164             canalStater.setProperties(managerProperties);
165 
166         } else {
167 
168             canalStater.setProperties(properties);
169 
170         }
171 
172         canalStater.start();
173 
174         //note: 这样用CDL处理和while(true)有点类似
175 
176         runningLatch.await();
177 
178         executor.shutdownNow();
179 
180     } catch (Throwable e) {
181 
182         logger.error("## Something goes wrong when starting up the canal Server:", e);
183 
184     }
185 
186 }

 

2.启动类CanalStarter


从上面的入口类,我们可以看到canal-server真正的启动逻辑在CanalStarter类的start方法。

这里先对三个对象进行辨析:

  • CanalController:是canalServer真正的启动控制器
  • canalMQStarter:用来启动mqProducer。如果serverMode选择了mq,那么会用canalMQStarter来管理mqProducer,将canalServer抓取到的实时变更用mqProducer直接投递到mq
  • CanalAdminWithNetty:这个不是admin控制台,而是对本server启动一个netty服务,让admin控制台通过请求获取当前server的信息,比如运行状态、正在本server上运行的instance信息等

start方法主要逻辑如下:

  • 根据配置的serverMode,决定使用CanalMQProducer或者canalServerWithNetty
  • 启动CanalController
  • 注册shutdownHook
  • 如果CanalMQProducer不为空,启动canalMQStarter(内部使用CanalMQProducer将消息投递给mq)
  • 启动CanalAdminWithNetty做服务器
  1 public synchronized void start() throws Throwable {
  2 
  3     String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
  4 
  5     /**
  6 
  7      * note
  8 
  9      * 1.如果canal.serverMode不是tcp,加载CanalMQProducer,并且启动CanalMQProducer
 10 
 11      * 回头可以深入研究下ExtensionLoader类的相关实现
 12 
 13      */
 14 
 15     if (!"tcp".equalsIgnoreCase(serverMode)) {
 16 
 17         ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);
 18 
 19         canalMQProducer = loader
 20 
 21                 .getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);
 22 
 23         if (canalMQProducer != null) {
 24 
 25             ClassLoader cl = Thread.currentThread().getContextClassLoader();
 26 
 27             Thread.currentThread().setContextClassLoader(canalMQProducer.getClass().getClassLoader());
 28 
 29             canalMQProducer.init(properties);
 30 
 31             Thread.currentThread().setContextClassLoader(cl);
 32 
 33         }
 34 
 35     }
 36 
 37     //note 如果启动了canalMQProducer,就不使用canalWithNetty(这里的netty是用在哪里的?)
 38 
 39     if (canalMQProducer != null) {
 40 
 41         MQProperties mqProperties = canalMQProducer.getMqProperties();
 42 
 43         // disable netty
 44 
 45         System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
 46 
 47         if (mqProperties.isFlatMessage()) {
 48 
 49             // 设置为raw避免ByteString->Entry的二次解析
 50 
 51             System.setProperty("canal.instance.memory.rawEntry", "false");
 52 
 53         }
 54 
 55     }
 56 
 57     controller = new CanalController(properties);
 58 
 59     //note 2.启动canalController
 60 
 61     controller.start();
 62 
 63     //note 3.注册了一个shutdownHook,系统退出时执行相关逻辑
 64 
 65     shutdownThread = new Thread() {
 66 
 67         public void run() {
 68 
 69             try {
 70 
 71                 controller.stop();
 72 
 73                 //note 主线程退出
 74 
 75                 CanalLauncher.runningLatch.countDown();
 76 
 77             } catch (Throwable e) {
 78 
 79 
 80             } finally {
 81 
 82             }
 83 
 84         }
 85 
 86     };
 87 
 88     Runtime.getRuntime().addShutdownHook(shutdownThread);
 89 
 90     //note 4.启动canalMQStarter,集群版的话,没有预先配置destinations。
 91 
 92     if (canalMQProducer != null) {
 93 
 94         canalMQStarter = new CanalMQStarter(canalMQProducer);
 95 
 96         String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
 97 
 98         canalMQStarter.start(destinations);
 99 
100         controller.setCanalMQStarter(canalMQStarter);
101 
102     }
103 
104     // start canalAdmin
105 
106     String port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT);
107 
108     //note 5.根据填写的canalAdmin的ip和port,启动canalAdmin,用netty做服务器
109 
110     if (canalAdmin == null && StringUtils.isNotEmpty(port)) {
111 
112         String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
113 
114         String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
115 
116         CanalAdminController canalAdmin = new CanalAdminController(this);
117 
118         canalAdmin.setUser(user);
119 
120         canalAdmin.setPasswd(passwd);
121 
122         String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP);
123 
124         CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance();
125 
126         canalAdminWithNetty.setCanalAdmin(canalAdmin);
127 
128         canalAdminWithNetty.setPort(Integer.parseInt(port));
129 
130         canalAdminWithNetty.setIp(ip);
131 
132         canalAdminWithNetty.start();
133 
134         this.canalAdmin = canalAdminWithNetty;
135 
136     }
137 
138     running = true;
139 
140 }

 

3.CanalController


前面两个类都是比较清晰的,一个是入口类,一个是启动类,下面来看看核心逻辑所在的CanalController。

这里用了大量的匿名内部类实现接口,看起来有点头大,耐心慢慢剖析一下。

3.1 从构造器开始了解

整体初始化的顺序如下:

  • 构建PlainCanalConfigClient,用于用户远程配置的获取
  • 初始化全局配置,顺便把instance相关的全局配置初始化一下
  • 准备一下canal-server,核心在于embededCanalServer,如果有需要canalServerWithNetty,那就多包装一个(我们serverMode=mq是不需要这个netty的)
  • 初始化zkClient
  • 初始化ServerRunningMonitors,作为instance 运行节点控制
  • 初始化InstanceAction,完成monitor机制。(监控instance配置变化然后调用ServerRunningMonitor进行处理)

这里有几个机制要详细介绍一下。

3.1.1 CanalServer两种模式

canalServer支持两种模式,CanalServerWithEmbedded和CanalServerWithNetty。

在构造器中初始化代码部分如下:

 1 // 3.准备canal server
 2 
 3 //note: 核心在于embededCanalServer,如果有需要canalServerWithNetty,那就多包装一个(我们serverMode=mq
 4 
 5 // 是不需要这个netty的)
 6 
 7 ip = getProperty(properties, CanalConstants.CANAL_IP);
 8 
 9 //省略一部分。。。
10 
11 embededCanalServer = CanalServerWithEmbedded.instance();
12 
13 embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
14 
15 int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112"));
16 
17 //省略一部分。。。
18 
19 String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
20 
21 if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
22 
23     canalServer = CanalServerWithNetty.instance();
24 
25     canalServer.setIp(ip);
26 
27     canalServer.setPort(port);
28 
29 }

 

embededCanalServer:类型为CanalServerWithEmbedded

canalServer:类型为CanalServerWithNetty

二者有什么区别呢?

都实现了CanalServer接口,且都实现了单例模式,通过静态方法instance获取实例。

关于这两种类型的实现,canal官方文档有以下描述:

「从零单排canal 04」 启动模块deployer源码解析

 

说白了,就是我们可以不必独立部署canal server。在应用直接使用CanalServerWithEmbedded直连mysql数据库进行订阅。

如果觉得自己的技术hold不住相关代码,就独立部署一个canal server,使用canal提供的客户端,连接canal server获取binlog解析后数据。而CanalServerWithNetty是在CanalServerWithEmbedded的基础上做的一层封装,用于与客户端通信。

在独立部署canal server时,Canal客户端发送的所有请求都交给CanalServerWithNetty处理解析,解析完成之后委派给了交给CanalServerWithEmbedded进行处理。因此CanalServerWithNetty就是一个马甲而已。CanalServerWithEmbedded才是核心。

因此,在构造器中,我们看到,

用于生成CanalInstance实例的instanceGenerator被设置到了CanalServerWithEmbedded中,

而ip和port被设置到CanalServerWithNetty中。

关于CanalServerWithNetty如何将客户端的请求委派给CanalServerWithEmbedded进行处理,我们将在server模块源码分析中进行讲解。

 

3.1.2 ServerRunningMonitor

在CanalController的构造器中,canal会为每一个destination创建一个Instance,每个Instance都会由一个ServerRunningMonitor来进行控制。而ServerRunningMonitor统一由ServerRunningMonitors进行管理。

ServerRunningMonitor是做什么的呢?

我们看下它的属性就了解了。它主要用来记录每个instance的运行状态数据的。

 1 /**
 2 
 3 * 针对server的running节点控制
 4 
 5 */
 6 
 7 public class ServerRunningMonitor extends AbstractCanalLifeCycle {
 8 
 9     private static final Logger        logger       = LoggerFactory.getLogger(ServerRunningMonitor.class);
10 
11     private ZkClientx                  zkClient;
12 
13     private String                     destination;
14 
15     private IZkDataListener            dataListener;
16 
17     private BooleanMutex               mutex        = new BooleanMutex(false);
18 
19     private volatile boolean           release      = false;
20 
21     // 当前服务节点状态信息
22 
23     private ServerRunningData          serverData;
24 
25     // 当前实际运行的节点状态信息
26 
27     private volatile ServerRunningData activeData;
28 
29     private ScheduledExecutorService   delayExector = Executors.newScheduledThreadPool(1);
30 
31     private int                        delayTime    = 5;
32 
33     private ServerRunningListener      listener;
34 
35     public ServerRunningMonitor(ServerRunningData serverData){
36 
37         this();
38 
39         this.serverData = serverData;
40 
41     }
42         //。。。。。
43 
44 }

 

在创建ServerRunningMonitor对象时,首先根据ServerRunningData创建ServerRunningMonitor实例,之后设置了destination和ServerRunningListener。

ServerRunningListener是个接口,这里采用了匿名内部类的形式构建,实现了各个接口的方法。

主要为instance在当前server上的状态发生变化时调用。比如要在当前server上启动这个instance了,就调用相关启动方法,如果在这个server上关闭instance,就调用相关关闭方法。

具体的调用逻辑我们后面在启动过程中分析,这里大概知道下构造器中做了些什么就行了,主要就是一些启动、关闭的逻辑。

  1 new Function<String, ServerRunningMonitor>() {
  2 
  3     public ServerRunningMonitor apply(final String destination) {
  4 
  5         ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
  6 
  7         runningMonitor.setDestination(destination);
  8 
  9         runningMonitor.setListener(new ServerRunningListener() {
 10 
 11             /**
 12 
 13              * note
 14 
 15              * 1.内部调用了embededCanalServer的start(destination)方法。
 16 
 17              * 这里很关键,说明每个destination对应的CanalInstance是通过embededCanalServer的start方法启动的,
 18 
 19              * 这样我们就能理解,为什么之前构造器中会把instanceGenerator设置到embededCanalServer中了。
 20 
 21              * embededCanalServer负责调用instanceGenerator生成CanalInstance实例,并负责其启动。
 22 
 23              *
 24 
 25              * 2.如果投递mq,还会直接调用canalMQStarter来启动一个destination
 26 
 27              */
 28 
 29             public void processActiveEnter() {
 30 
 31                //省略具体内容。。。
 32             }
 33 
 34             /**
 35 
 36              * note
 37 
 38              * 1.与开始顺序相反,如果有mqStarter,先停止mqStarter的destination
 39 
 40              * 2.停止embedeCanalServer的destination
 41 
 42              */
 43 
 44             public void processActiveExit() {
 45 
 46                 //省略具体内容。。。
 47 
 48             }
 49 
 50             /**
 51 
 52              * note
 53 
 54              * 在Canalinstance启动之前,destination注册到ZK上,创建节点
 55 
 56              * 路径为:/otter/canal/destinations/{0}/cluster/{1},其0会被destination替换,1会被ip:port替换。
 57 
 58              * 此方法会在processActiveEnter()之前被调用
 59 
 60              */
 61 
 62             public void processStart() {
 63 
 64                 //省略具体内容。。。
 65 
 66             }
 67 
 68             /**
 69 
 70              * note
 71 
 72              * 在Canalinstance停止前,把ZK上节点删除掉
 73 
 74              * 路径为:/otter/canal/destinations/{0}/cluster/{1},其0会被destination替换,1会被ip:port替换。
 75 
 76              * 此方法会在processActiveExit()之前被调用
 77 
 78              */
 79 
 80             public void processStop() {
 81 
 82                 //省略具体内容。。。
 83             }
 84 
 85         });
 86 
 87         if (zkclientx != null) {
 88 
 89             runningMonitor.setZkClient(zkclientx);
 90 
 91         }
 92 
 93         // 触发创建一下cid节点
 94 
 95         runningMonitor.init();
 96 
 97         return runningMonitor;
 98 
 99     }
100 
101 }

 

3.2 canalController的start方法

具体运行逻辑如下:

  • 在zk的/otter/canal/cluster目录下根据ip:port创建server的临时节点,注册zk监听器
  • 先启动embededCanalServer(会启动对应的监控)
  • 根据配置的instance的destination,调用runningMonitor.start() 逐个启动instance
  • 如果cannalServer不为空,启动canServer (canalServerWithNetty)

这里需要注意,canalServer什么时候为空?

如果用户选择了serverMode为mq,那么就不会启动canalServerWithNetty,采用mqStarter来作为server,直接跟mq集群交互。canalServerWithNetty只有在serverMode为tcp时才启动,用来跟canal-client做交互。

所以如果以后想把embeddedCanal嵌入自己的应用,可以考虑参考mqStarter的写法。后面我们在server模块中会做详细解析。

  1 public void start() throws Throwable {
  2 
  3     // 创建整个canal的工作节点
  4 
  5     final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port);
  6 
  7     initCid(path);
  8 
  9     if (zkclientx != null) {
 10 
 11         this.zkclientx.subscribeStateChanges(new IZkStateListener() {
 12 
 13             public void handleStateChanged(KeeperState state) throws Exception {
 14 
 15             }
 16 
 17             public void handleNewSession() throws Exception {
 18 
 19                 initCid(path);
 20 
 21             }
 22 
 23             @Override
 24 
 25             public void handleSessionEstablishmentError(Throwable error) throws Exception{
 26 
 27                 logger.error("failed to connect to zookeeper", error);
 28 
 29             }
 30 
 31         });
 32 
 33     }
 34 
 35     // 先启动embeded服务
 36 
 37     embededCanalServer.start();
 38 
 39     // 尝试启动一下非lazy状态的通道
 40 
 41     for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
 42 
 43         final String destination = entry.getKey();
 44 
 45         InstanceConfig config = entry.getValue();
 46 
 47         // 创建destination的工作节点
 48 
 49         if (!embededCanalServer.isStart(destination)) {
 50 
 51             // HA机制启动
 52 
 53             ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
 54 
 55             if (!config.getLazy() && !runningMonitor.isStart()) {
 56 
 57                 runningMonitor.start();
 58 
 59             }
 60 
 61         }
 62 
 63         //note:为每个instance注册一个配置监视器
 64 
 65         if (autoScan) {
 66 
 67             instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
 68 
 69         }
 70 
 71     }
 72 
 73     if (autoScan) {
 74 
 75         //note:启动线程定时去扫描配置
 76 
 77         instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
 78 
 79         //note:这部分代码似乎没有用,目前只能是manager或者spring两种方式二选一
 80 
 81         for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
 82 
 83             if (!monitor.isStart()) {
 84 
 85                 monitor.start();
 86 
 87             }
 88 
 89         }
 90 
 91     }
 92 
 93     // 启动网络接口
 94 
 95     if (canalServer != null) {
 96 
 97         canalServer.start();
 98 
 99     }
100 
101 }

 

我们重点关注启动instance的过程,也就是ServerRunningMonitor的运行机制,也就是HA启动的关键。

入口在runningMonitor.start()。

  • 如果zkClient != null,就用zk进行HA启动
  • 否则,就直接processActiveEnter启动,这个我们前面已经分析过了
 1 public synchronized void start() {
 2 
 3     super.start();
 4 
 5     try {
 6 
 7         /**
 8 
 9          * note
10 
11          * 内部会调用ServerRunningListener的processStart()方法
12 
13          */
14 
15         processStart();
16 
17         if (zkClient != null) {
18 
19             // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start
20 
21             String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
22 
23             zkClient.subscribeDataChanges(path, dataListener);
24 
25             initRunning();
26 
27         } else {
28 
29             /**
30 
31              * note
32 
33              * 内部直接调用ServerRunningListener的processActiveEnter()方法
34 
35              */
36 
37             processActiveEnter();// 没有zk,直接启动
38 
39         }
40 
41     } catch (Exception e) {
42 
43         logger.error("start failed", e);
44 
45         // 没有正常启动,重置一下状态,避免干扰下一次start
46 
47         stop();
48 
49     }
50 
51 }

 

重点关注下HA启动方式,一般 我们都采用这种模式进行。

在集群模式下,可能会有多个canal server共同处理同一个destination,

在某一时刻,只能由一个canal server进行处理,处理这个destination的canal server进入running状态,其他canal server进入standby状态。

同时,通过监听对应的path节点,一旦发生变化,出现异常,可以立刻尝试自己进入running,保证了instace的 高可用!!

启动的重点还是在initRuning()。

利用zk来保证集群中有且只有 一个instance任务在运行。

  • 还构建一个临时节点的路径:/otter/canal/destinations/{0}/running
  • 尝试创建临时节点。
  • 如果节点已经存在,说明是其他的canal server已经启动了这个canal instance。此时会抛出ZkNodeExistsException,进入catch代码块。
  • 如果创建成功,就说明没有其他server启动这个instance,可以创建
 1 private void initRunning() {
 2     if (!isStart()) {
 3         return;
 4     }
 5 
 6 
 7     //note: 还是一样构建一个临时节点的路径:/otter/canal/destinations/{0}/running
 8     String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
 9     // 序列化
10     byte[] bytes = JsonUtils.marshalToByte(serverData);
11     try {
12         mutex.set(false);
13         /**
14          * note:
15          * 尝试创建临时节点。如果节点已经存在,说明是其他的canal server已经启动了这个canal instance。
16          * 此时会抛出ZkNodeExistsException,进入catch代码块。
17          */
18         zkClient.create(path, bytes, CreateMode.EPHEMERAL);
19         /**
20          * note:
21          * 如果创建成功,就开始触发启动事件
22          */
23         activeData = serverData;
24         processActiveEnter();// 触发一下事件
25         mutex.set(true);
26         release = false;
27     } catch (ZkNodeExistsException e) {
28         /**
29          * note:
30          * 如果捕获异常,表示创建失败。
31          * 就根据临时节点路径查一下是哪个canal-sever创建了。
32          * 如果没有相关信息,马上重新尝试一下。
33          * 如果确实存在,就把相关信息保存下来
34          */
35         bytes = zkClient.readData(path, true);
36         if (bytes == null) {// 如果不存在节点,立即尝试一次
37             initRunning();
38         } else {
39             activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
40         }
41     } catch (ZkNoNodeException e) {
42         /**
43          * note:
44          * 如果是父节点不存在,那么就尝试创建一下父节点,然后再初始化。
45          */
46         zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点
47         initRunning();
48     }
49 }

 

那运行中的HA是如何实现的呢,我们回头看一下

zkClient.subscribeDataChanges(path, dataListener);
对destination对应的running节点进行监听,一旦发生了变化,则说明可能其他处理相同destination的canal server可能出现了异常,此时需要尝试自己进入running状态。

dataListener是在ServerRunningMonitor的构造方法中初始化的,

包括节点发生变化、节点被删两种变化情况以及相对应的处理逻辑,如下 :

 
 1 public ServerRunningMonitor(){
 2     // 创建父节点
 3     dataListener = new IZkDataListener() {
 4         /**
 5          * note:
 6          * 当注册节点发生变化时,会自动回调这个方法。
 7          * 我们回想一下使用过程中,什么时候可能 改变节点当状态呢?
 8          * 大概是在控制台中,对canal-server中正在运行的 instance做"停止"操作时,改变了isActive。
 9          * 可以 触发 HA。
10          */
11         public void handleDataChange(String dataPath, Object data) throws Exception {
12             MDC.put("destination", destination);
13             ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
14             if (!isMine(runningData.getAddress())) {
15                 mutex.set(false);
16             }
17 
18             if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active
19                 releaseRunning();// 彻底释放mainstem
20             }
21 
22             activeData = (ServerRunningData) runningData;
23         }
24 
25 
26         /**
27          * note:
28          * 如果其他canal instance出现异常,临时节点数据被删除时,会自动回调这个方法,此时当前canal instance要顶上去
29          */
30         public void handleDataDeleted(String dataPath) throws Exception {
31             MDC.put("destination", destination);
32             mutex.set(false);
33             if (!release && activeData != null && isMine(activeData.getAddress())) {
34                 // 如果上一次active的状态就是本机,则即时触发一下active抢占
35                 initRunning();
36             } else {
37                 // 否则就是等待delayTime,避免因网络异常或者zk异常,导致出现频繁的切换操作
38                 delayExector.schedule(new Runnable() {
39                     public void run() {
40                         initRunning();
41                     }
42                 }, delayTime, TimeUnit.SECONDS);
43             }
44         }
45     };
46 }
 

当注册节点发生变化时,会自动回调zkListener的handleDataChange方法。

我们回想一下使用过程中,什么时候可能 改变节点当状态呢?

就是在控制台中,对canal-server中正在运行的 instance做”停止”操作时,改变了isActive,可以 触发 HA。

如下图所示

「从零单排canal 04」 启动模块deployer源码解析

 

4.admin的配置监控原理

我们现在采用admin做全局的配置控制。

那么每个canalServer是怎么监控配置的变化呢?

还记得上吗cananlController的start方法中对配置监视器的启动吗?

 1 if (autoScan) {
 2         //note:启动线程定时去扫描配置
 3         instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
 4         //note:这部分代码似乎没有用,目前只能是manager或者spring两种方式二选一
 5         for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
 6             if (!monitor.isStart()) {
 7                 monitor.start();
 8             }
 9         }
10     }

 

这个就是关键的配置监控。

我们来看deployer模块中的monitor包了。

「从零单排canal 04」 启动模块deployer源码解析

 

4.1 InstanceAction

是一个接口,有四个方法,用来获取配置后,对具体instance采取动作。

 1 /**
 2 * config配置变化后的动作
 3 *
 4 */
 5 public interface InstanceAction {
 6 
 7 
 8     /**
 9      * 启动destination
10      */
11     void start(String destination);
12 
13 
14     /**
15      * 主动释放destination运行
16      */
17     void release(String destination);
18 
19 
20     /**
21      * 停止destination
22      */
23     void stop(String destination);
24 
25 
26     /**
27      * 重载destination,可能需要stop,start操作,或者只是更新下内存配置
28      */
29     void reload(String destination);
30 }

 

具体实现在canalController的构造器中实现了匿名类。

4.2 InstanceConfigMonitor

这个接口有两个实现,一个是基于spring的,一个基于manager(就是admin)。

我们看下基于manager配置的实现的ManagerInstanceConfigMonitor即可。

原理很简单。

  • 采用一个固定大小线程池,每隔5s,使用PlainCanalConfigClient去拉取instance配置
  • 然后通过defaultAction去start
  • 这个start在canalController的构造器的匿名类中实现,会使用instance对应的runningMonitor做HA启动。具体逻辑上一小节已经详细介绍过了。
 1 /**
 2 * 基于manager配置的实现
 3 *
 4 */
 5 public class ManagerInstanceConfigMonitor extends AbstractCanalLifeCycle implements InstanceConfigMonitor, CanalLifeCycle {
 6 
 7 
 8     private static final Logger         logger               = LoggerFactory.getLogger(ManagerInstanceConfigMonitor.class);
 9     private long                        scanIntervalInSecond = 5;
10     private InstanceAction              defaultAction        = null;
11     /**
12      * note:
13      * 每个instance对应的instanceAction,实际上我们看代码发现都是用的同一个defaultAction
14      */
15     private Map<String, InstanceAction> actions              = new MapMaker().makeMap();
16     /**
17      * note:
18      * 每个instance对应的远程配置
19      */
20     private Map<String, PlainCanal>     configs              = MigrateMap.makeComputingMap(new Function<String, PlainCanal>() {
21                                                                  public PlainCanal apply(String destination) {
22                                                                      return new PlainCanal();
23                                                                  }
24                                                              });
25     /**
26      * note:
27      * 一个固定大小线程池,每隔5s,使用PlainCanalConfigClient去拉取instance配置
28      */
29     private ScheduledExecutorService    executor             = Executors.newScheduledThreadPool(1,
30                                                                  new NamedThreadFactory("canal-instance-scan"));
31 
32     private volatile boolean            isFirst              = true;
33     /**
34      * note:
35      * 拉取admin配置的client
36      */
37     private PlainCanalConfigClient      configClient;
38 //
39 }

 

5.总结

deployer模块的主要作用:

1)读取canal.properties,确定canal instance的配置加载方式。如果使用了admin,那么还会定时拉取admin上的配置更新。

2)确定canal-server的启动方式:独立启动或者集群方式启动

3)利用zkClient监听canal instance在zookeeper上的状态变化,动态停止、启动或新增,实现了instance的HA

4)利用InstanceConfigMonitor,采用固定线程定时轮训admin,获取instance的最新配置

5)启动canal server,监听客户端请求

这里还有个非常有意思的问题没有展开说明,那就是CanalStarter里面的配置加载,通过ExtensionLoader类的相关实现,如何通过不同的类加载器,实现SPI,后面再分析吧。

都看到最后了,原创不易,点个关注,点个赞吧~

文章持续更新,可以微信搜索「阿丸笔记 」第一时间阅读,回复关键字【学习】有我准备的一线大厂面试资料。

知识碎片重新梳理,构建Java知识图谱:
github.com/saigu/JavaK…(历史文章查阅非常方便)

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/2613.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 遗传算法工具箱约束怎么输入_遗传算法中怎么添加约束条件

    遗传算法工具箱约束怎么输入_遗传算法中怎么添加约束条件前言网上有很多博客讲解遗传算法,但是大都只是“点到即止”,虽然给了一些代码实现,但也是“浅尝辄止”,没能很好地帮助大家进行扩展应用,抑或是进行深入的研究。这是我的开篇之作~之前没有写博客的习惯,一般是将笔记存本地,但久而久之发现回看不便,而且无法与大家交流和学习。现特此写下开篇之作,若有疏漏之处,敬请指正,谢谢!本文对遗传算法的原理进行梳理,相关代码是基于国内高校学生联合团队开源…

  • 排列组合公式的原理_有序排列组合公式

    排列组合公式的原理_有序排列组合公式绪论:加法原理、乘法原理#分类计数原理:做一件事,有n类办法,在第1类办法中有m1种不同的方法,在第2类办法中有m2种不同的方法,…,在第n类办法中有mn种不同的方法,那么完成这件事共有N=m1+m2+…+mn种不同的方法。分步计数原理:完成一件事,需要分成n个步骤,做第1步有m1种不同的方法,做第2步有m2种不同的方法,…,做第n步有mn种不同的方法,那么完成这件事共有N=m1×m2×⋯×mn种不同的方法。区别:分类计数原理是加法原理,不同的类加起来就是我要得到的总数;分步计数原理是乘法原理,是同一

    2022年10月21日
  • uni-app打包成安卓app步骤详解[通俗易懂]

    前置:开发环境AndroidStudio下载地址:AndroidStudio官网ORAndroidStudio中文社区HBuilderXApp离线SDK下载:最新android平台SDK下载3.1.10版本起需要申请Appkey,具体请点击链接正文:通过uni-app实现一套代码在微信小程序和安卓端app同时适配1.创建文件创建Demo文件,采用uni-app模板2.创建应用在https://dev.dcloud.net.cn/app页面创建相同名称的应用,并且获取

  • 软考中高项学员:2016年4月4日作业

    软考中高项学员:2016年4月4日作业

  • 软件免杀教程_EXE文件

    软件免杀教程_EXE文件攻击机:win7IP:192.168.32.134靶机:windowsserver2012(安装360、火绒)IP:192.168.32.133第一步:使用njRAT生产一个客户端exe木马输入回连端口号8888,点击start配置客户端木马的回连地址:192.168.32.134将文件保存在桌面开启360杀毒,直接报毒,不免杀1、将生成的客户端木马:Server.exe在EncryptionToolV3.0中以base64加密方式打开打开之后,将base6

  • 什么是Hackbar?

    什么是Hackbar?**什么是Hackbar?**Hackbar是一个Firefox的插件,它的功能类似于地址栏,但是它里面的数据不受服务器的相应触发的重定向等其它变化的影响.有网址的载入于访问,联合查询,各种编码,数据加密功能.这个Hackbar可以帮助你在测试SQL注入,XSS漏洞和网站的安全性,主要是帮助开发人员做代码的安全审计,检查代码,寻找安全漏洞…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号