分布式应用技术

分布式应用技术

本文讲一些常见的分布式应用层面的技术,其中大部分都依赖于Zookeeper,所以对zookeeper不熟悉的同学可以先看我之前写的两往篇博客Zookeeper编程(二)Zookeeper编程(一)

我们的推荐程序部署在多台服务器上,每天凌晨会去重建信息索引(索引存在Redis上)。建索引这件事情只能让一台服务器去做,其他服务器歇着,这种应用场景自然想到了分布式锁,谁抢到锁谁来建索引。我们在线上分别使用过3种分布式锁:基于zookeeper的锁,基于MySQL的锁,基于Redis的锁。MySQL lock最稳定;Redis lock使用起来最灵活,同时实时起来也最方便。所以现在我们线上的分布式锁全换成了Redis Lock。

zookeeper lock的实现原理参见链接,不再赘述。

MySQL lock的基本思想是大家都去写同一条数据库记录,谁先写上谁获取锁,删除这条记录就相当于释放了锁。整个流程看起来像这样:

begin;
select count(*) from table where lockname='xxx' for update;
if count==0:
    insert into table (lockname) values ('xxx);
    do the task which must be done once
    delete from table where lockname='xxx'
commit;

如果2个进程同时执行第一步,发现lockname不存在,于是都去添加一行记录,还都添加成功了,那岂不是2个进程都获得了锁?莫急,看见select语句后面有个”for update”吗?当where条件中不包含主键时,select … for update将会锁表,事务提交后才释放表上的锁。(for update仅适用于InnoDB)

Redis lock的实现思想跟mysql相同,不过操作起来更简单。看下面的代码

Long i = jedis.setnx(lockName, lockName);    // 若key不存在,则存储 ,并返回1
if (i == 1L) {
    // 设置key的过期时间
    if (live < 0) {
        live = DEFAULT_EXPIRE_TIME;
    }
    jedis.expire(lockName, live);
    logger.info("get redis lock " + lockName + " ,live " + live
            + " seconds");

    rect = true;        //获得锁返回true
} else { // 已存在锁
    logger.info("lockName: " + lockName
            + " locked by other business");
    rect = false;        //没有获得锁返回false
}

核心操作是redis提供的setnx()方法,它来保证并发情况下中有1个进程能写成功。另外我们还为redis的key设置了超时时间,即使你获得锁后忘记了释放锁,或者在释放锁之前进程死掉了,不用担心,在达到超时时间后该锁也是会自动释放的。

Barrier

接着上面的应用场景讲。在推荐系统中,建完信息索引后就要开始为每个用户进行推荐了。推荐任务要分发到每台服务器上去执行,我们没有做单独的任务分发器,而是每台服务器都去同一个数据表里读取所有的用户ID,userid % n == 自己的编号时(n是服务器总数),该服务器就计算这个用户的推荐。计算推荐的过程也伴随着计算用户兴趣,所有用户的推荐计算完毕后,兴趣也就计算完毕了,此时又需要建立兴趣索引。建立兴趣索引又是只能由一台服务器来做的事情。这里有2个关键节点,即必须建完兴趣索引后所有服务器才能开始计算推荐,所有服务器计算完推荐后才能开始建兴趣索引。分布式环境下各服务器之间要想达成这种默契就需要借助于DoubleBarrier。

Barrier是指:

1)所有的线程都到达barrier后才能进行后续的计算

或者

2)所有的线程都完成自己的计算后才能离开barrier

Double Barrier是指同时具有上述两点。

Double Barrier的实现:

复制代码
enter barrier:
1.建一个根节点"/root"
2.想进入barrier的线程在"/root"下建立一个子节点"/root/c_i"
3.循环监听"/root"孩子节点数目的变化,当其达到size时就说明有size个线程都已经barrier点了

leave barrier:
1.想离开barrier的线程删除其在"/root"下建立的子节点
2.循环监听"/root"孩子节点数目的变化,当size减到0时它就可以离开barrier了

服务注册

 继续研究上面的应用场景,我们提到每台服务器遇到“userid % n == 自己的编号时(n是服务器总数)”这样的用户时才为其计算推荐,这里有两个问题:

  1. 集群中服务器的总数如何获取?如果直接设置成上线的服务器的个数会存在2个问题:将来服务器数目增加了n还得跟着改;如果哪天某台服务器进程挂了,那就造成1/n的用户没有推荐数据。
  2. 本服务器在集群中的编号如何获得?

解决办法是:

每台服务器进程启动时在特定的zookeeper路径下添加一个EPHEMERAL节点,节点是存储的数据为自己的IP(或者其他能唯一标识一台服务器的东西)。之所以要求是EPHEMERAL类型,是因为当进程死掉后该zookeeper节点会自动被删除掉。每天凌晨每台服务器去获取特定zookeeper路径下所有的子节点,子节点数目即为集群中服务器总数。根据IP每台服务器就可以知道自己在所有的孩子节点中排名第几。

ServerCluster.java

分布式应用技术
分布式应用技术

 1 import java.util.List;  2 import java.util.concurrent.Executors;  3 import java.util.concurrent.ScheduledExecutorService;  4 import java.util.concurrent.TimeUnit;  5  6 import org.apache.commons.logging.Log;  7 import org.apache.commons.logging.LogFactory;  8 import org.apache.curator.framework.CuratorFramework;  9 import org.apache.curator.framework.api.CreateBuilder;  10 import org.apache.zookeeper.CreateMode;  11  12  13 /**  14  *  15  * @Author:orisun  16  * @Since:2016-4-7  17  * @Version:1.0  18 */  19 public class ServerCluster {  20  21 private static Log logger = LogFactory.getLog(ServerCluster.class);  22 private static final String BASE_PATH = ZkClient.getInstance().getBasePath() + "/cluster";  23 private static ScheduledExecutorService exec = Executors  24  .newSingleThreadScheduledExecutor();  25  26 /**  27  * 向集群上报自己的存在,即把自己的IP写到特定的zk节点(EPHEMERAL节点)上去  28 */  29 public static void reportServer() {  30 String selfIP = NIC.getLocalIP();  31 CuratorFramework zkClient = ZkClient.getInstance().getZkClient();  32 boolean exists = false;  33 try {  34 CreateBuilder cb = zkClient.create();  35 if (zkClient.checkExists().forPath(BASE_PATH) == null) {  36  cb.creatingParentsIfNeeded().forPath(BASE_PATH,  37 new byte[] { 0 });  38  }  39 List<String> children = zkClient.getChildren().forPath(BASE_PATH);  40 if (children != null && children.indexOf(selfIP) >= 0) {  41 exists = true;  42  }  43 if (!exists) {  44 // EPHEMERAL节点,进程终止时zookeeper连接断开,节点自动被删除  45  cb.creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)  46 .forPath(BASE_PATH + "/" + selfIP, new byte[] { 0 });  47 logger.info(selfIP + " add to cluster");  48 } else {  49 // 如果发现cluster上已存在该IP,则5秒后再确认一下  50 logger.info(selfIP + " is already in cluster");  51 Thread.sleep(1000 * 5);  52 children = zkClient.getChildren().forPath(BASE_PATH);  53 exists = false;  54 if (children != null && children.indexOf(selfIP) >= 0) {  55 exists = true;  56  }  57 if (!exists) {  58 // EPHEMERAL节点,进程终止时zookeeper连接断开,节点自动被删除  59  cb.creatingParentsIfNeeded()  60  .withMode(CreateMode.EPHEMERAL)  61 .forPath(BASE_PATH + "/" + selfIP, new byte[] { 0 });  62 logger.info(selfIP + " add to cluster");  63  }  64  }  65 } catch (Exception e) {  66 logger.fatal("report to cluster failed", e);  67  }  68  }  69  70 /**  71  * 向集群上报自己的存在,即把自己的IP写到特定的zk节点(EPHEMERAL节点)上去<br>  72  * 为防止zookeeper会话断开而造成节点被删除,每隔10分钟就去写一次  73 */  74 public static void report() {  75 exec.scheduleAtFixedRate(new Runnable() {  76  @Override  77 public void run() {  78  reportServer();  79  }  80 }, 0, 10, TimeUnit.MINUTES);  81  }  82  83 /**  84  * 获取集群中有多少台机器  85  *  86  * @return  87 */  88 public static int getClusterSize() {  89 int total = 0;  90 List<String> children = null;  91 try {  92 CuratorFramework zkClient = ZkClient.getInstance().getZkClient();  93 children = zkClient.getChildren().forPath(BASE_PATH);  94 } catch (Exception e) {  95 logger.error("get children of " + BASE_PATH + " failed", e);  96  }  97 if (children != null) {  98 total = children.size();  99  } 100 logger.info("cluster size is " + total); 101 return total; 102  } 103 104 /** 105  * 获取自己在集群中的编码(从0开始) 106  * 107  * @return 108 */ 109 public static int getIndexInCluster() { 110 int index = -1; 111 CuratorFramework zkClient = ZkClient.getInstance().getZkClient(); 112 try { 113 List<String> children = zkClient.getChildren().forPath(BASE_PATH); 114 String selfIP = NIC.getLocalIP(); 115 index = children.indexOf(selfIP); 116 } catch (Exception e) { 117 logger.fatal("get cluster info failed", e); 118  } 119 logger.info("this server's index is " + index); 120 return index; 121  } 122 }

View Code

消息通知

 经常有这样的业务场景,一台服务器执行完任务A后,需要通达其他几台服务器执行任务B。当然们可以借助于MQ框架来实现这种通知的功能,但是这种消息通常一天只发一次,为其单独创建一个消息主题未免太浪费。于是我们又想到了zookeeper,各服务器去监听同一个zookeeper节点的变化,当一台服务器执行完任务A后去把zookeeper节点的值更新当时时间,其他服务器监听到zookeeper节点有变化后,先确认一下变化为UPDATE类型,然后执行任务B,执行完后再去zookeeper节点上添加一个watcher(因为watcher只能使用一次,所以需要重复监听)。在执行任务B的过程中如果节点有变化,其他服务器是感知不到的,这样就防止了任务B在同一个进程内并行执行。

配置管理

在推荐系统中有很多配置参数,我们不想因为参数的改动而走一次上线流程(要知道参数的改动还是比较频繁的)。简单的做法是参数都写入数据库中,要改参数直接改数据库即可,程序每天去跟数据库同步一次参数。这种做法缺点很明显,就是会有一天的延迟。为了能让程序能实时感知到参数的变化,我们把参数写在zookeeper上,程序去监听zookeeper节点的变化,当有变化时从zookeeper上读取最新有值即可。考虑到zookeeper的稳定时不如MySQL,我们会在MySQL上同时存一份配置参数,通过后台修改参数时会同时修改MySQL和zookeeper。每天应用程序去跟MySQL做一次同步,zookeeper仅作为实时通知的一个工具存在。

 ZkParam.java

分布式应用技术
分布式应用技术

 1 /**  2  *  3  *@Author:orisun  4  *@Since:2016-4-7  5  *@Version:1.0  6 */  7 public class ZkParam {  8  9 private double value; 10 private String path; 11 private int logicid; 12 13 public ZkParam(double defaultValue, String path, int logicid) { 14 this.value = defaultValue; 15 this.path = path; 16 this.logicid = logicid; 17  } 18 19 public double getValue() { 20 return value; 21  } 22 23 public void setValue(double value) { 24 this.value = value; 25  } 26 27 public String getPath() { 28 return path; 29  } 30 31 public void setPath(String path) { 32 this.path = path; 33  } 34 35 public int getLogicid() { 36 return logicid; 37  } 38 39 public void setLogicid(int logicid) { 40 this.logicid = logicid; 41  } 42 }

View Code

ZkConfig.java

分布式应用技术
分布式应用技术

 1 import java.lang.reflect.Field;  2 import java.lang.reflect.InvocationTargetException;  3 import java.lang.reflect.Method;  4 import java.util.concurrent.ExecutorService;  5 import java.util.concurrent.Executors;  6  7 import org.apache.commons.logging.Log;  8 import org.apache.commons.logging.LogFactory;  9 import org.apache.curator.framework.CuratorFramework;  10 import org.apache.curator.framework.api.CreateBuilder;  11 import org.apache.curator.framework.recipes.cache.NodeCache;  12 import org.apache.curator.framework.recipes.cache.NodeCacheListener;  13  14 /**  15  *  16  * @Author:orisun  17  * @Since:2016-4-7  18  * @Version:1.0  19 */  20 public abstract class ZkConfig {  21  22 private static Log logger = LogFactory.getLog(ZkConfig.class);  23 private static final String ZK_PARAM_NAME = ZkParam.class  24  .getCanonicalName();  25 private Class<?> zkArgClz = null;  26 private static Method getZkPathMethod = null;  27 private static Method getIdMethod = null;  28 private static Method getValueMethod = null;  29 private ExecutorService exec = null;  30  31 public ZkConfig() {  32 exec = Executors.newCachedThreadPool();  33 try {  34 zkArgClz = Class.forName(ZK_PARAM_NAME);  35 getZkPathMethod = zkArgClz.getMethod("getPath");  36 getIdMethod = zkArgClz.getMethod("getLogicid");  37 getValueMethod = zkArgClz.getMethod("getValue");  38 } catch (Exception e) {  39 logger.fatal("build " + ZK_PARAM_NAME + " failed", e);  40 System.exit(1);  41  }  42  }  43  44 public void updateParam(String filedName, ZkParam newArgument) {  45 try {  46 Method method = this.getClass().getMethod(pareSetName(filedName),  47 ZkParam.class);  48 method.invoke(this, newArgument);  49 logger.info("set " + filedName + " to " + newArgument.getValue());  50 } catch (NoSuchMethodException | SecurityException  51 | IllegalAccessException | IllegalArgumentException  52 | InvocationTargetException e) {  53  logger.error(  54 "zookeeper node is changed, but update system parameter failed",  55  e);  56  }  57  }  58  59 /**  60  * 添加zookeeper监听,参数变化时及时反应到推荐系统中  61 */  62 @SuppressWarnings("resource")  63 private void addListener() {  64 final CuratorFramework zkClient = ZkClient.getInstance().getZkClient();  65 try {  66 Field[] fields = this.getClass().getDeclaredFields();// 父类中的成员获取不到  67 for (final Field field : fields) {  68 field.setAccessible(true);  69 if (field.getType().getCanonicalName().equals(ZK_PARAM_NAME)) {  70 Object zkParamInst = field.get(this);  71 final String path = (String) getZkPathMethod  72  .invoke(zkParamInst);  73 if (zkClient.checkExists().forPath(path) != null) {  74 final int logicid = (int) getIdMethod  75  .invoke(zkParamInst);  76 NodeCache nodeCache = new NodeCache(zkClient, path,  77 false);  78 nodeCache.start(true);  79  nodeCache.getListenable().addListener(  80 new NodeCacheListener() {  81  @Override  82 public void nodeChanged() throws Exception {  83 byte[] brr = zkClient.getData()  84  .forPath(path);  85 double newValue = Double.parseDouble(new String(  86  brr));  87 ZkParam newArgument = new ZkParam(  88  newValue, path, logicid);  89  updateParam(field.getName(),  90  newArgument);  91  }  92  }, exec);  93 logger.info("add listener to " + path);  94 } else {  95 logger.error("will add listner on zookeeper path "  96 + path + ", but it dose not exists");  97  }  98  }  99  } 100 } catch (Exception e) { 101 logger.error("add listener to zookeeper failed", e); 102 SendMail.sendMail(SystemConfig.getValue("mail_subject"), 103 SystemConfig.getValue("mail_receiver"), 104 "add listener to zookeeper failed<br>" + e.getMessage()); 105  } 106  } 107 108 /** 109  * 每天定时任务,从MySQL中读取参数的值及参数对应的zkpath,然后监听该zkpath。<br> 110  * 这是为了防止zookeeper连不上,或watcher机制失效 111 */ 112 public void readFromMysql() { 113 try { 114 final ParamConfigDao dao = new ParamConfigDao(); 115 Field[] fields = this.getClass().getDeclaredFields(); 116 for (Field field : fields) { 117 field.setAccessible(true); 118 if (field.getType().getCanonicalName().equals(ZK_PARAM_NAME)) { 119 Object zkParamInst = field.get(this); 120 String path0 = (String) getZkPathMethod.invoke(zkParamInst); 121 int logicid = (int) getIdMethod.invoke(zkParamInst); 122 ParamConfig param = dao.getByLogicId(logicid); 123 if (param != null) { 124 String path1 = param.getZkpath(); 125 String path = (path1 != null && path1.length() > 0) ? path1 126  : path0; 127 if (param != null) { 128 ZkParam newArgument = new ZkParam(param.getValue(), 129  path, logicid); 130 Method method = this.getClass() 131  .getMethod(pareSetName(field.getName()), 132 ZkParam.class); 133 method.invoke(this, newArgument); 134  } 135 } else { 136 logger.error("have no such param whoese logicid is " 137 + logicid + " im mysql"); 138  } 139  } 140  } 141 } catch (Exception e) { 142 logger.error("read param from mysql failed", e); 143 SendMail.sendMail(SystemConfig.getValue("mail_subject"), 144 SystemConfig.getValue("mail_receiver"), 145 "read param from mysql failed<br>" + e.getMessage()); 146  } 147  addListener(); 148  } 149 150 /** 151  * 把参数写入到zookeeper 152  * 153  * 154 */ 155 public void flushToZookeeper() { 156 try { 157 CuratorFramework zkClient = ZkClient.getInstance().getZkClient(); 158 CreateBuilder cb = zkClient.create(); 159 Field[] fields = this.getClass().getDeclaredFields(); 160 for (final Field field : fields) { 161 field.setAccessible(true); 162 if (field.getType().getCanonicalName().equals(ZK_PARAM_NAME)) { 163 Object zkParamInst = field.get(this); 164 String path = (String) getZkPathMethod.invoke(zkParamInst); 165 // 写之前先保证路径是存在的 166 if (zkClient.checkExists().forPath(path) == null) { 167  cb.creatingParentsIfNeeded().forPath(path, 168 new byte[] { 0 }); 169  } 170 double value = (double) getValueMethod.invoke(zkParamInst); 171  zkClient.setData().forPath(path, 172  String.valueOf(value).getBytes()); 173  } 174  } 175 } catch (Exception e) { 176 logger.error("flush behavior weight to zookeeper failed", e); 177  SendMail.sendMail( 178 SystemConfig.getValue("mail_subject"), 179 SystemConfig.getValue("mail_receiver"), 180 "flush behavior weight to zookeeper failed<br>" 181 + e.getMessage()); 182  } 183  } 184 185 /** 186  * 拼接某属性set 方法 187  * 188  * @param fldname 189  * @return 190 */ 191 public static String pareSetName(String fldname) { 192 if (null == fldname || "".equals(fldname)) { 193 return null; 194  } 195 String pro = "set" + fldname.substring(0, 1).toUpperCase() 196 + fldname.substring(1); 197 return pro; 198  } 199 200 /** 201  * 拼接某属性get 方法 202  * 203  * @param fldname 204  * @return 205 */ 206 public static String pareGetName(String fldname) { 207 if (null == fldname || "".equals(fldname)) { 208 return null; 209  } 210 String pro = "get" + fldname.substring(0, 1).toUpperCase() 211 + fldname.substring(1); 212 return pro; 213  } 214 }

View Code

使用示例

分布式应用技术
分布式应用技术

 1 /**  2  *  3  * @Author:orisun  4  * @Since:2016-4-7  5  * @Version:1.0  6 */  7 public class TalentRecParamConfig extends ZkConfig {  8  9 private static final String BASE_PATH = ZkClient.getInstance().getBasePath() + "/talent_rec"; 10 private static final String MAX_REC_NUM_PATH = BASE_PATH + "/max_rec"; 11 private static final String MIN_TITLE_SIM_PATH = BASE_PATH 12 + "/min_title_sim"; 13 /** 每个职位最多推荐多少个用户 **/ 14 private ZkParam maxRec = new ZkParam(1000, MAX_REC_NUM_PATH, 21); 15 /** 最小标题相似度阈值 **/ 16 private ZkParam mimTitleSim = new ZkParam(0.55, MIN_TITLE_SIM_PATH, 22);// TODO 17 // 该阈值可能还需要提升 18 19 private static volatile TalentRecParamConfig instance = null; 20 21 private TalentRecParamConfig() { 22 super(); 23  } 24 25 /** 26  * 单例 27  * 28  * @return 29 */ 30 public static TalentRecParamConfig getInstance() { 31 if (instance == null) { 32 synchronized (TalentRecParamConfig.class) { 33 if (instance == null) { 34 instance = new TalentRecParamConfig(); 35  } 36  } 37  } 38 return instance; 39  } 40 41 public ZkParam getMaxRec() { 42 return maxRec; 43  } 44 45 public void setMaxRec(ZkParam maxRec) { 46 this.maxRec = maxRec; 47  } 48 49 public ZkParam getMimTitleSim() { 50 return mimTitleSim; 51  } 52 53 public void setMimTitleSim(ZkParam mimTitleSim) { 54 this.mimTitleSim = mimTitleSim; 55  } 56 57 }

View Code

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/109020.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • minipcie转nvme,msata转ngff sata「建议收藏」

    minipcie转nvme,msata转ngff sata「建议收藏」现如今的电脑主板,m.2/ngff接口相当普遍。具有minipcie接口的主板大概只有工控主板和老款的主板的了。并不代表minipcie就会别淘汰了。对于消费用户,minipcie接口就是装个无线网卡。但在某些情况下,并不需要这无线网卡功能,这一接口就作他用了,比如扩展nvme固态盘或者傲腾呢?!nvme固态盘/傲腾是m.2/ngff接口的,接口不一样,不能直接安装。但是可以通过接口转接板将minipcie接口转换成m.2/ngff接口来安装nvme固态盘,NICE!常见的nvme固态盘尺寸

  • 惠普笔记本电脑自动关机是什么原因_台式老电脑经常自动关机怎么办

    惠普笔记本电脑自动关机是什么原因_台式老电脑经常自动关机怎么办运行台式惠普win7系统电脑时总会碰到各种故障问题,这不一位用户说电脑总是自动关机,怎么回事?造成电脑自动关机的原因有很多,我们要根据具体故障原因来解决,下面小编告诉大家台式惠普电脑总是自动关机的三种原因及解决方法。惠普电脑总是自动关机的解决方法一:1、首先检查是不是硬件问题造成的电脑突然关机,如果你不懂硬件设备,建议您拿到专业电脑维修点进行维修检查,请不要乱拆否则硬件设备严重损坏。2、在检查电脑…

  • spring boot dubbo配置(上古卷轴5基础整合包)

    SpringBoot整合Dubbo3.0基础配置(dubbo-spring-boot-starter)一、说明众所周知,阿里早已把dubbo捐赠给了Apache,现在dubbo由Apache在维护更新,dubbo也已经成了Apache下的顶级项目。所以本demo项目所依赖的坐标是Apache官方最新的3.0.4坐标。<dependency><groupId>org.apache.dubbo</groupId><artifac

  • SourceInsight注册码「建议收藏」

    SourceInsight注册码「建议收藏」版本:注册码:SI3US-361500-17409

  • NVIC库函数

    NVIC库函数1.voidNVIC_Init(NVIC_InitTypeDef*NVIC_InitStruct)功能:根据NVIC_InitStruct结构体变量中的参数初始化NVIC外设注释:结构体中的NVIC_IRQChannel成员赋值要到stm32f10x.h中的IRQn_Type(STM32F10x中断数定义)去复制例如:NVIC_Init(&amp;NVIC_InitStructur…

  • Byte数组转byte数组_java object对象转数组

    Byte数组转byte数组_java object对象转数组这里用到了java对象的序列化,即要求要转换成Byte数组的对象必须是可序列化的。java代码如下:/***对象转Byte数组**@paramobj*@return*@throwsException*/publicstaticbyte[]objectToBytes(Objectobj)throwsException{logger.debug(“object

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号