「从零单排canal 04」 启动模块deployer源码解析

「从零单排canal 04」 启动模块deployer源码解析

基于1.1.5-alpha版本,具体源码笔记可以参考我的github:https://github.com/saigu/JavaKnowledgeGraph/tree/master/code_reading/canal

本文将对canal的启动模块deployer进行分析。

Deployer模块(绿色部分)在整个系统中的角色如下图所示,用来启动canal-server.

「从零单排canal 04」 启动模块deployer源码解析

 

模块内的类如下:

「从零单排canal 04」 启动模块deployer源码解析

 

为了能带着目的看源码,以几个问题开头,带着问题来一起探索deployer模块的源码。

  • CanalServer启动过程中配置如何加载?
  • CanalServer启动过程中涉及哪些组件?
  • 集群模式的canalServer,是如何实现instance的HA呢?
  • 每个canalServer又是怎么获取admin上的配置变更呢?

1.入口类CanalLauncher


这个类是整个canal-server的入口类。负责配置加载和启动canal-server。

主流程如下:

  • 加载canal.properties的配置内容
  • 根据canal.admin.manager是否为空判断是否是admin控制,如果不是admin控制,就直接根据canal.properties的配置来了
  • 如果是admin控制,使用PlainCanalConfigClient获取远程配置 新开一个线程池每隔五秒用http请求去admin上拉配置进行merge(这里依赖了instance模块的相关配置拉取的工具方法) 用md5进行校验,如果canal-server配置有更新,那么就重启canal-server
  • 核心是用canalStarter.start()启动
  • 使用CountDownLatch保持主线程存活
  • 收到关闭信号,CDL-1,然后关闭配置更新线程池,优雅退出
 1 public static void main(String[] args) {  2  3 try {  4  5 //note:设置全局未捕获异常的处理  6  7  setGlobalUncaughtExceptionHandler();  8  9 /**  10  11  * note:  12  13  * 1.读取canal.properties的配置  14  15  * 可以手动指定配置路径名称  16  17 */  18  19 String conf = System.getProperty("canal.conf", "classpath:canal.properties");  20  21 Properties properties = new Properties();  22  23 if (conf.startsWith(CLASSPATH_URL_PREFIX)) {  24  25 conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);  26  27 properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));  28  29 } else {  30  31 properties.load(new FileInputStream(conf));  32  33  }  34  35 final CanalStarter canalStater = new CanalStarter(properties);  36  37 String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);  38  39 /**  40  41  * note:  42  43  * 2.根据canal.admin.manager是否为空判断是否是admin控制,如果不是admin控制,就直接根据canal.properties的配置来了  44  45 */  46  47 if (StringUtils.isNotEmpty(managerAddress)) {  48  49 String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);  50  51 //省略一部分。。。。。。  52  53  54 /**  55  56  * note:  57  58  * 2.1使用PlainCanalConfigClient获取远程配置  59  60 */  61  62 final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,  63  64  user,  65  66  passwd,  67  68  registerIp,  69  70  Integer.parseInt(adminPort),  71  72  autoRegister,  73  74  autoCluster);  75  76 PlainCanal canalConfig = configClient.findServer(null);  77  78 if (canalConfig == null) {  79  80 throw new IllegalArgumentException("managerAddress:" + managerAddress  81  82 + " can't not found config for [" + registerIp + ":" + adminPort  83  84 + "]");  85  86  }  87  88 Properties managerProperties = canalConfig.getProperties();  89  90 // merge local  91  92  managerProperties.putAll(properties);  93  94 int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,  95  96  CanalConstants.CANAL_AUTO_SCAN_INTERVAL,  97  98 "5"));  99 100 /** 101 102  * note: 103 104  * 2.2 新开一个线程池每隔五秒用http请求去admin上拉配置进行merge(这里依赖了instance模块的相关配置拉取的工具方法) 105 106 */ 107 108 executor.scheduleWithFixedDelay(new Runnable() { 109 110 private PlainCanal lastCanalConfig; 111 112 public void run() { 113 114 try { 115 116 if (lastCanalConfig == null) { 117 118 lastCanalConfig = configClient.findServer(null); 119 120 } else { 121 122 PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5()); 123 124 /** 125 126  * note: 127 128  * 2.3 用md5进行校验,如果canal-server配置有更新,那么就重启canal-server 129 130 */ 131 132 if (newCanalConfig != null) { 133 134 // 远程配置canal.properties修改重新加载整个应用 135 136  canalStater.stop(); 137 138 Properties managerProperties = newCanalConfig.getProperties(); 139 140 // merge local 141 142  managerProperties.putAll(properties); 143 144  canalStater.setProperties(managerProperties); 145 146  canalStater.start(); 147 148 lastCanalConfig = newCanalConfig; 149 150  } 151 152  } 153 154 } catch (Throwable e) { 155 156 logger.error("scan failed", e); 157 158  } 159 160  } 161 162 }, 0, scanIntervalInSecond, TimeUnit.SECONDS); 163 164  canalStater.setProperties(managerProperties); 165 166 } else { 167 168  canalStater.setProperties(properties); 169 170  } 171 172  canalStater.start(); 173 174 //note: 这样用CDL处理和while(true)有点类似 175 176  runningLatch.await(); 177 178  executor.shutdownNow(); 179 180 } catch (Throwable e) { 181 182 logger.error("## Something goes wrong when starting up the canal Server:", e); 183 184  } 185 186 }

 

2.启动类CanalStarter


从上面的入口类,我们可以看到canal-server真正的启动逻辑在CanalStarter类的start方法。

这里先对三个对象进行辨析:

  • CanalController:是canalServer真正的启动控制器
  • canalMQStarter:用来启动mqProducer。如果serverMode选择了mq,那么会用canalMQStarter来管理mqProducer,将canalServer抓取到的实时变更用mqProducer直接投递到mq
  • CanalAdminWithNetty:这个不是admin控制台,而是对本server启动一个netty服务,让admin控制台通过请求获取当前server的信息,比如运行状态、正在本server上运行的instance信息等

start方法主要逻辑如下:

  • 根据配置的serverMode,决定使用CanalMQProducer或者canalServerWithNetty
  • 启动CanalController
  • 注册shutdownHook
  • 如果CanalMQProducer不为空,启动canalMQStarter(内部使用CanalMQProducer将消息投递给mq)
  • 启动CanalAdminWithNetty做服务器
 1 public synchronized void start() throws Throwable {  2  3 String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);  4  5 /**  6  7  * note  8  9  * 1.如果canal.serverMode不是tcp,加载CanalMQProducer,并且启动CanalMQProducer  10  11  * 回头可以深入研究下ExtensionLoader类的相关实现  12  13 */  14  15 if (!"tcp".equalsIgnoreCase(serverMode)) {  16  17 ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class);  18  19 canalMQProducer = loader  20  21  .getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR);  22  23 if (canalMQProducer != null) {  24  25 ClassLoader cl = Thread.currentThread().getContextClassLoader();  26  27  Thread.currentThread().setContextClassLoader(canalMQProducer.getClass().getClassLoader());  28  29  canalMQProducer.init(properties);  30  31  Thread.currentThread().setContextClassLoader(cl);  32  33  }  34  35  }  36  37 //note 如果启动了canalMQProducer,就不使用canalWithNetty(这里的netty是用在哪里的?)  38  39 if (canalMQProducer != null) {  40  41 MQProperties mqProperties = canalMQProducer.getMqProperties();  42  43 // disable netty  44  45 System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");  46  47 if (mqProperties.isFlatMessage()) {  48  49 // 设置为raw避免ByteString->Entry的二次解析  50  51 System.setProperty("canal.instance.memory.rawEntry", "false");  52  53  }  54  55  }  56  57 controller = new CanalController(properties);  58  59 //note 2.启动canalController  60  61  controller.start();  62  63 //note 3.注册了一个shutdownHook,系统退出时执行相关逻辑  64  65 shutdownThread = new Thread() {  66  67 public void run() {  68  69 try {  70  71  controller.stop();  72  73 //note 主线程退出  74  75  CanalLauncher.runningLatch.countDown();  76  77 } catch (Throwable e) {  78  79  80 } finally {  81  82  }  83  84  }  85  86  };  87  88  Runtime.getRuntime().addShutdownHook(shutdownThread);  89  90 //note 4.启动canalMQStarter,集群版的话,没有预先配置destinations。  91  92 if (canalMQProducer != null) {  93  94 canalMQStarter = new CanalMQStarter(canalMQProducer);  95  96 String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);  97  98  canalMQStarter.start(destinations);  99 100  controller.setCanalMQStarter(canalMQStarter); 101 102  } 103 104 // start canalAdmin 105 106 String port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT); 107 108 //note 5.根据填写的canalAdmin的ip和port,启动canalAdmin,用netty做服务器 109 110 if (canalAdmin == null && StringUtils.isNotEmpty(port)) { 111 112 String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER); 113 114 String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD); 115 116 CanalAdminController canalAdmin = new CanalAdminController(this); 117 118  canalAdmin.setUser(user); 119 120  canalAdmin.setPasswd(passwd); 121 122 String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP); 123 124 CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance(); 125 126  canalAdminWithNetty.setCanalAdmin(canalAdmin); 127 128  canalAdminWithNetty.setPort(Integer.parseInt(port)); 129 130  canalAdminWithNetty.setIp(ip); 131 132  canalAdminWithNetty.start(); 133 134 this.canalAdmin = canalAdminWithNetty; 135 136  } 137 138 running = true; 139 140 }

 

3.CanalController


前面两个类都是比较清晰的,一个是入口类,一个是启动类,下面来看看核心逻辑所在的CanalController。

这里用了大量的匿名内部类实现接口,看起来有点头大,耐心慢慢剖析一下。

3.1 从构造器开始了解

整体初始化的顺序如下:

  • 构建PlainCanalConfigClient,用于用户远程配置的获取
  • 初始化全局配置,顺便把instance相关的全局配置初始化一下
  • 准备一下canal-server,核心在于embededCanalServer,如果有需要canalServerWithNetty,那就多包装一个(我们serverMode=mq是不需要这个netty的)
  • 初始化zkClient
  • 初始化ServerRunningMonitors,作为instance 运行节点控制
  • 初始化InstanceAction,完成monitor机制。(监控instance配置变化然后调用ServerRunningMonitor进行处理)

这里有几个机制要详细介绍一下。

3.1.1 CanalServer两种模式

canalServer支持两种模式,CanalServerWithEmbedded和CanalServerWithNetty。

在构造器中初始化代码部分如下:

 1 // 3.准备canal server  2  3 //note: 核心在于embededCanalServer,如果有需要canalServerWithNetty,那就多包装一个(我们serverMode=mq  4  5 // 是不需要这个netty的)  6  7 ip = getProperty(properties, CanalConstants.CANAL_IP);  8  9 //省略一部分。。。 10 11 embededCanalServer = CanalServerWithEmbedded.instance(); 12 13 embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator 14 15 int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112")); 16 17 //省略一部分。。。 18 19 String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY); 20 21 if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) { 22 23 canalServer = CanalServerWithNetty.instance(); 24 25  canalServer.setIp(ip); 26 27  canalServer.setPort(port); 28 29 }

 

embededCanalServer:类型为CanalServerWithEmbedded

canalServer:类型为CanalServerWithNetty

二者有什么区别呢?

都实现了CanalServer接口,且都实现了单例模式,通过静态方法instance获取实例。

关于这两种类型的实现,canal官方文档有以下描述:

「从零单排canal 04」 启动模块deployer源码解析

 

说白了,就是我们可以不必独立部署canal server。在应用直接使用CanalServerWithEmbedded直连mysql数据库进行订阅。

如果觉得自己的技术hold不住相关代码,就独立部署一个canal server,使用canal提供的客户端,连接canal server获取binlog解析后数据。而CanalServerWithNetty是在CanalServerWithEmbedded的基础上做的一层封装,用于与客户端通信。

在独立部署canal server时,Canal客户端发送的所有请求都交给CanalServerWithNetty处理解析,解析完成之后委派给了交给CanalServerWithEmbedded进行处理。因此CanalServerWithNetty就是一个马甲而已。CanalServerWithEmbedded才是核心。

因此,在构造器中,我们看到,

用于生成CanalInstance实例的instanceGenerator被设置到了CanalServerWithEmbedded中,

而ip和port被设置到CanalServerWithNetty中。

关于CanalServerWithNetty如何将客户端的请求委派给CanalServerWithEmbedded进行处理,我们将在server模块源码分析中进行讲解。

 

3.1.2 ServerRunningMonitor

在CanalController的构造器中,canal会为每一个destination创建一个Instance,每个Instance都会由一个ServerRunningMonitor来进行控制。而ServerRunningMonitor统一由ServerRunningMonitors进行管理。

ServerRunningMonitor是做什么的呢?

我们看下它的属性就了解了。它主要用来记录每个instance的运行状态数据的。

 1 /**  2  3 * 针对server的running节点控制  4  5 */  6  7 public class ServerRunningMonitor extends AbstractCanalLifeCycle {  8  9 private static final Logger logger = LoggerFactory.getLogger(ServerRunningMonitor.class); 10 11 private ZkClientx zkClient; 12 13 private String destination; 14 15 private IZkDataListener dataListener; 16 17 private BooleanMutex mutex = new BooleanMutex(false); 18 19 private volatile boolean release = false; 20 21 // 当前服务节点状态信息 22 23 private ServerRunningData serverData; 24 25 // 当前实际运行的节点状态信息 26 27 private volatile ServerRunningData activeData; 28 29 private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1); 30 31 private int delayTime = 5; 32 33 private ServerRunningListener listener; 34 35 public ServerRunningMonitor(ServerRunningData serverData){ 36 37 this(); 38 39 this.serverData = serverData; 40 41  } 42 //。。。。。 43 44 }

 

在创建ServerRunningMonitor对象时,首先根据ServerRunningData创建ServerRunningMonitor实例,之后设置了destination和ServerRunningListener。

ServerRunningListener是个接口,这里采用了匿名内部类的形式构建,实现了各个接口的方法。

主要为instance在当前server上的状态发生变化时调用。比如要在当前server上启动这个instance了,就调用相关启动方法,如果在这个server上关闭instance,就调用相关关闭方法。

具体的调用逻辑我们后面在启动过程中分析,这里大概知道下构造器中做了些什么就行了,主要就是一些启动、关闭的逻辑。

 1 new Function<String, ServerRunningMonitor>() {  2  3 public ServerRunningMonitor apply(final String destination) {  4  5 ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);  6  7  runningMonitor.setDestination(destination);  8  9 runningMonitor.setListener(new ServerRunningListener() {  10  11 /**  12  13  * note  14  15  * 1.内部调用了embededCanalServer的start(destination)方法。  16  17  * 这里很关键,说明每个destination对应的CanalInstance是通过embededCanalServer的start方法启动的,  18  19  * 这样我们就能理解,为什么之前构造器中会把instanceGenerator设置到embededCanalServer中了。  20  21  * embededCanalServer负责调用instanceGenerator生成CanalInstance实例,并负责其启动。  22  23  *  24  25  * 2.如果投递mq,还会直接调用canalMQStarter来启动一个destination  26  27 */  28  29 public void processActiveEnter() {  30  31 //省略具体内容。。。  32  }  33  34 /**  35  36  * note  37  38  * 1.与开始顺序相反,如果有mqStarter,先停止mqStarter的destination  39  40  * 2.停止embedeCanalServer的destination  41  42 */  43  44 public void processActiveExit() {  45  46 //省略具体内容。。。  47  48  }  49  50 /**  51  52  * note  53  54  * 在Canalinstance启动之前,destination注册到ZK上,创建节点  55  56  * 路径为:/otter/canal/destinations/{0}/cluster/{1},其0会被destination替换,1会被ip:port替换。  57  58  * 此方法会在processActiveEnter()之前被调用  59  60 */  61  62 public void processStart() {  63  64 //省略具体内容。。。  65  66  }  67  68 /**  69  70  * note  71  72  * 在Canalinstance停止前,把ZK上节点删除掉  73  74  * 路径为:/otter/canal/destinations/{0}/cluster/{1},其0会被destination替换,1会被ip:port替换。  75  76  * 此方法会在processActiveExit()之前被调用  77  78 */  79  80 public void processStop() {  81  82 //省略具体内容。。。  83  }  84  85  });  86  87 if (zkclientx != null) {  88  89  runningMonitor.setZkClient(zkclientx);  90  91  }  92  93 // 触发创建一下cid节点  94  95  runningMonitor.init();  96  97 return runningMonitor;  98  99  } 100 101 }

 

3.2 canalController的start方法

具体运行逻辑如下:

  • 在zk的/otter/canal/cluster目录下根据ip:port创建server的临时节点,注册zk监听器
  • 先启动embededCanalServer(会启动对应的监控)
  • 根据配置的instance的destination,调用runningMonitor.start() 逐个启动instance
  • 如果cannalServer不为空,启动canServer (canalServerWithNetty)

这里需要注意,canalServer什么时候为空?

如果用户选择了serverMode为mq,那么就不会启动canalServerWithNetty,采用mqStarter来作为server,直接跟mq集群交互。canalServerWithNetty只有在serverMode为tcp时才启动,用来跟canal-client做交互。

所以如果以后想把embeddedCanal嵌入自己的应用,可以考虑参考mqStarter的写法。后面我们在server模块中会做详细解析。

 1 public void start() throws Throwable {  2  3 // 创建整个canal的工作节点  4  5 final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port);  6  7  initCid(path);  8  9 if (zkclientx != null) {  10  11 this.zkclientx.subscribeStateChanges(new IZkStateListener() {  12  13 public void handleStateChanged(KeeperState state) throws Exception {  14  15  }  16  17 public void handleNewSession() throws Exception {  18  19  initCid(path);  20  21  }  22  23  @Override  24  25 public void handleSessionEstablishmentError(Throwable error) throws Exception{  26  27 logger.error("failed to connect to zookeeper", error);  28  29  }  30  31  });  32  33  }  34  35 // 先启动embeded服务  36  37  embededCanalServer.start();  38  39 // 尝试启动一下非lazy状态的通道  40  41 for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {  42  43 final String destination = entry.getKey();  44  45 InstanceConfig config = entry.getValue();  46  47 // 创建destination的工作节点  48  49 if (!embededCanalServer.isStart(destination)) {  50  51 // HA机制启动  52  53 ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);  54  55 if (!config.getLazy() && !runningMonitor.isStart()) {  56  57  runningMonitor.start();  58  59  }  60  61  }  62  63 //note:为每个instance注册一个配置监视器  64  65 if (autoScan) {  66  67  instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);  68  69  }  70  71  }  72  73 if (autoScan) {  74  75 //note:启动线程定时去扫描配置  76  77  instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();  78  79 //note:这部分代码似乎没有用,目前只能是manager或者spring两种方式二选一  80  81 for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {  82  83 if (!monitor.isStart()) {  84  85  monitor.start();  86  87  }  88  89  }  90  91  }  92  93 // 启动网络接口  94  95 if (canalServer != null) {  96  97  canalServer.start();  98  99  } 100 101 }

 

我们重点关注启动instance的过程,也就是ServerRunningMonitor的运行机制,也就是HA启动的关键。

入口在runningMonitor.start()。

  • 如果zkClient != null,就用zk进行HA启动
  • 否则,就直接processActiveEnter启动,这个我们前面已经分析过了
 1 public synchronized void start() {  2  3 super.start();  4  5 try {  6  7 /**  8  9  * note 10 11  * 内部会调用ServerRunningListener的processStart()方法 12 13 */ 14 15  processStart(); 16 17 if (zkClient != null) { 18 19 // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start 20 21 String path = ZookeeperPathUtils.getDestinationServerRunning(destination); 22 23  zkClient.subscribeDataChanges(path, dataListener); 24 25  initRunning(); 26 27 } else { 28 29 /** 30 31  * note 32 33  * 内部直接调用ServerRunningListener的processActiveEnter()方法 34 35 */ 36 37 processActiveEnter();// 没有zk,直接启动 38 39  } 40 41 } catch (Exception e) { 42 43 logger.error("start failed", e); 44 45 // 没有正常启动,重置一下状态,避免干扰下一次start 46 47  stop(); 48 49  } 50 51 }

 

重点关注下HA启动方式,一般 我们都采用这种模式进行。

在集群模式下,可能会有多个canal server共同处理同一个destination,

在某一时刻,只能由一个canal server进行处理,处理这个destination的canal server进入running状态,其他canal server进入standby状态。

同时,通过监听对应的path节点,一旦发生变化,出现异常,可以立刻尝试自己进入running,保证了instace的 高可用!!

启动的重点还是在initRuning()。

利用zk来保证集群中有且只有 一个instance任务在运行。

  • 还构建一个临时节点的路径:/otter/canal/destinations/{0}/running
  • 尝试创建临时节点。
  • 如果节点已经存在,说明是其他的canal server已经启动了这个canal instance。此时会抛出ZkNodeExistsException,进入catch代码块。
  • 如果创建成功,就说明没有其他server启动这个instance,可以创建
 1 private void initRunning() {  2 if (!isStart()) {  3 return;  4  }  5  6  7 //note: 还是一样构建一个临时节点的路径:/otter/canal/destinations/{0}/running  8 String path = ZookeeperPathUtils.getDestinationServerRunning(destination);  9 // 序列化 10 byte[] bytes = JsonUtils.marshalToByte(serverData); 11 try { 12 mutex.set(false); 13 /** 14  * note: 15  * 尝试创建临时节点。如果节点已经存在,说明是其他的canal server已经启动了这个canal instance。 16  * 此时会抛出ZkNodeExistsException,进入catch代码块。 17 */ 18  zkClient.create(path, bytes, CreateMode.EPHEMERAL); 19 /** 20  * note: 21  * 如果创建成功,就开始触发启动事件 22 */ 23 activeData = serverData; 24 processActiveEnter();// 触发一下事件 25 mutex.set(true); 26 release = false; 27 } catch (ZkNodeExistsException e) { 28 /** 29  * note: 30  * 如果捕获异常,表示创建失败。 31  * 就根据临时节点路径查一下是哪个canal-sever创建了。 32  * 如果没有相关信息,马上重新尝试一下。 33  * 如果确实存在,就把相关信息保存下来 34 */ 35 bytes = zkClient.readData(path, true); 36 if (bytes == null) {// 如果不存在节点,立即尝试一次 37  initRunning(); 38 } else { 39 activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class); 40  } 41 } catch (ZkNoNodeException e) { 42 /** 43  * note: 44  * 如果是父节点不存在,那么就尝试创建一下父节点,然后再初始化。 45 */ 46 zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点 47  initRunning(); 48  } 49 }

 

那运行中的HA是如何实现的呢,我们回头看一下

zkClient.subscribeDataChanges(path, dataListener);
对destination对应的running节点进行监听,一旦发生了变化,则说明可能其他处理相同destination的canal server可能出现了异常,此时需要尝试自己进入running状态。

dataListener是在ServerRunningMonitor的构造方法中初始化的,

包括节点发生变化、节点被删两种变化情况以及相对应的处理逻辑,如下 :

 
 1 public ServerRunningMonitor(){  2 // 创建父节点  3 dataListener = new IZkDataListener() {  4 /**  5  * note:  6  * 当注册节点发生变化时,会自动回调这个方法。  7  * 我们回想一下使用过程中,什么时候可能 改变节点当状态呢?  8  * 大概是在控制台中,对canal-server中正在运行的 instance做"停止"操作时,改变了isActive。  9  * 可以 触发 HA。 10 */ 11 public void handleDataChange(String dataPath, Object data) throws Exception { 12 MDC.put("destination", destination); 13 ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class); 14 if (!isMine(runningData.getAddress())) { 15 mutex.set(false); 16  } 17 18 if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active 19 releaseRunning();// 彻底释放mainstem 20  } 21 22 activeData = (ServerRunningData) runningData; 23  } 24 25 26 /** 27  * note: 28  * 如果其他canal instance出现异常,临时节点数据被删除时,会自动回调这个方法,此时当前canal instance要顶上去 29 */ 30 public void handleDataDeleted(String dataPath) throws Exception { 31 MDC.put("destination", destination); 32 mutex.set(false); 33 if (!release && activeData != null && isMine(activeData.getAddress())) { 34 // 如果上一次active的状态就是本机,则即时触发一下active抢占 35  initRunning(); 36 } else { 37 // 否则就是等待delayTime,避免因网络异常或者zk异常,导致出现频繁的切换操作 38 delayExector.schedule(new Runnable() { 39 public void run() { 40  initRunning(); 41  } 42  }, delayTime, TimeUnit.SECONDS); 43  } 44  } 45  }; 46 }
 

当注册节点发生变化时,会自动回调zkListener的handleDataChange方法。

我们回想一下使用过程中,什么时候可能 改变节点当状态呢?

就是在控制台中,对canal-server中正在运行的 instance做”停止”操作时,改变了isActive,可以 触发 HA。

如下图所示

「从零单排canal 04」 启动模块deployer源码解析

 

4.admin的配置监控原理

我们现在采用admin做全局的配置控制。

那么每个canalServer是怎么监控配置的变化呢?

还记得上吗cananlController的start方法中对配置监视器的启动吗?

 1 if (autoScan) {  2 //note:启动线程定时去扫描配置  3  instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();  4 //note:这部分代码似乎没有用,目前只能是manager或者spring两种方式二选一  5 for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {  6 if (!monitor.isStart()) {  7  monitor.start();  8  }  9  } 10 }

 

这个就是关键的配置监控。

我们来看deployer模块中的monitor包了。

「从零单排canal 04」 启动模块deployer源码解析

 

4.1 InstanceAction

是一个接口,有四个方法,用来获取配置后,对具体instance采取动作。

 1 /**  2 * config配置变化后的动作  3 *  4 */  5 public interface InstanceAction {  6  7  8 /**  9  * 启动destination 10 */ 11 void start(String destination); 12 13 14 /** 15  * 主动释放destination运行 16 */ 17 void release(String destination); 18 19 20 /** 21  * 停止destination 22 */ 23 void stop(String destination); 24 25 26 /** 27  * 重载destination,可能需要stop,start操作,或者只是更新下内存配置 28 */ 29 void reload(String destination); 30 }

 

具体实现在canalController的构造器中实现了匿名类。

4.2 InstanceConfigMonitor

这个接口有两个实现,一个是基于spring的,一个基于manager(就是admin)。

我们看下基于manager配置的实现的ManagerInstanceConfigMonitor即可。

原理很简单。

  • 采用一个固定大小线程池,每隔5s,使用PlainCanalConfigClient去拉取instance配置
  • 然后通过defaultAction去start
  • 这个start在canalController的构造器的匿名类中实现,会使用instance对应的runningMonitor做HA启动。具体逻辑上一小节已经详细介绍过了。
 1 /**  2 * 基于manager配置的实现  3 *  4 */  5 public class ManagerInstanceConfigMonitor extends AbstractCanalLifeCycle implements InstanceConfigMonitor, CanalLifeCycle {  6  7  8 private static final Logger logger = LoggerFactory.getLogger(ManagerInstanceConfigMonitor.class);  9 private long scanIntervalInSecond = 5; 10 private InstanceAction defaultAction = null; 11 /** 12  * note: 13  * 每个instance对应的instanceAction,实际上我们看代码发现都是用的同一个defaultAction 14 */ 15 private Map<String, InstanceAction> actions = new MapMaker().makeMap(); 16 /** 17  * note: 18  * 每个instance对应的远程配置 19 */ 20 private Map<String, PlainCanal> configs = MigrateMap.makeComputingMap(new Function<String, PlainCanal>() { 21 public PlainCanal apply(String destination) { 22 return new PlainCanal(); 23  } 24  }); 25 /** 26  * note: 27  * 一个固定大小线程池,每隔5s,使用PlainCanalConfigClient去拉取instance配置 28 */ 29 private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, 30 new NamedThreadFactory("canal-instance-scan")); 31 32 private volatile boolean isFirst = true; 33 /** 34  * note: 35  * 拉取admin配置的client 36 */ 37 private PlainCanalConfigClient configClient; 38 // 39 }

 

5.总结

deployer模块的主要作用:

1)读取canal.properties,确定canal instance的配置加载方式。如果使用了admin,那么还会定时拉取admin上的配置更新。

2)确定canal-server的启动方式:独立启动或者集群方式启动

3)利用zkClient监听canal instance在zookeeper上的状态变化,动态停止、启动或新增,实现了instance的HA

4)利用InstanceConfigMonitor,采用固定线程定时轮训admin,获取instance的最新配置

5)启动canal server,监听客户端请求

这里还有个非常有意思的问题没有展开说明,那就是CanalStarter里面的配置加载,通过ExtensionLoader类的相关实现,如何通过不同的类加载器,实现SPI,后面再分析吧。

都看到最后了,原创不易,点个关注,点个赞吧~

文章持续更新,可以微信搜索「阿丸笔记 」第一时间阅读,回复关键字【学习】有我准备的一线大厂面试资料。

知识碎片重新梳理,构建Java知识图谱:
github.com/saigu/JavaK…(历史文章查阅非常方便)

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/2613.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号