


我们的推荐程序部署在多台服务器上,每天凌晨会去重建信息索引(索引存在Redis上)。建索引这件事情只能让一台服务器去做,其他服务器歇着,这种应用场景自然想到了分布式锁,谁抢到锁谁来建索引。我们在线上分别使用过3种分布式锁:基于zookeeper的锁,基于MySQL的锁,基于Redis的锁。MySQL lock最稳定;Redis lock使用起来最灵活,同时实时起来也最方便。所以现在我们线上的分布式锁全换成了Redis Lock。

zookeeper lock的实现原理参见链接,不再赘述。

MySQL lock的基本思想是大家都去写同一条数据库记录,谁先写上谁获取锁,删除这条记录就相当于释放了锁。整个流程看起来像这样:

select count(*) from table where lockname='xxx' for update;
if count==0:
    insert into table (lockname) values ('xxx);
    do the task which must be done once
    delete from table where lockname='xxx'

如果2个进程同时执行第一步,发现lockname不存在,于是都去添加一行记录,还都添加成功了,那岂不是2个进程都获得了锁?莫急,看见select语句后面有个”for update”吗?当where条件中不包含主键时,select … for update将会锁表,事务提交后才释放表上的锁。(for update仅适用于InnoDB)

Redis lock的实现思想跟mysql相同,不过操作起来更简单。看下面的代码

Long i = jedis.setnx(lockName, lockName);    // 若key不存在,则存储 ,并返回1
if (i == 1L) {
    // 设置key的过期时间
    if (live < 0) {
        live = DEFAULT_EXPIRE_TIME;
    jedis.expire(lockName, live);
    logger.info("get redis lock " + lockName + " ,live " + live
            + " seconds");

    rect = true;        //获得锁返回true
} else { // 已存在锁
    logger.info("lockName: " + lockName
            + " locked by other business");
    rect = false;        //没有获得锁返回false



接着上面的应用场景讲。在推荐系统中,建完信息索引后就要开始为每个用户进行推荐了。推荐任务要分发到每台服务器上去执行,我们没有做单独的任务分发器,而是每台服务器都去同一个数据表里读取所有的用户ID,userid % n == 自己的编号时(n是服务器总数),该服务器就计算这个用户的推荐。计算推荐的过程也伴随着计算用户兴趣,所有用户的推荐计算完毕后,兴趣也就计算完毕了,此时又需要建立兴趣索引。建立兴趣索引又是只能由一台服务器来做的事情。这里有2个关键节点,即必须建完兴趣索引后所有服务器才能开始计算推荐,所有服务器计算完推荐后才能开始建兴趣索引。分布式环境下各服务器之间要想达成这种默契就需要借助于DoubleBarrier。





Double Barrier是指同时具有上述两点。

Double Barrier的实现:

enter barrier:

leave barrier:


 继续研究上面的应用场景,我们提到每台服务器遇到“userid % n == 自己的编号时(n是服务器总数)”这样的用户时才为其计算推荐,这里有两个问题:

  1. 集群中服务器的总数如何获取?如果直接设置成上线的服务器的个数会存在2个问题:将来服务器数目增加了n还得跟着改;如果哪天某台服务器进程挂了,那就造成1/n的用户没有推荐数据。
  2. 本服务器在集群中的编号如何获得?





 1 import java.util.List;  2 import java.util.concurrent.Executors;  3 import java.util.concurrent.ScheduledExecutorService;  4 import java.util.concurrent.TimeUnit;  5  6 import org.apache.commons.logging.Log;  7 import org.apache.commons.logging.LogFactory;  8 import org.apache.curator.framework.CuratorFramework;  9 import org.apache.curator.framework.api.CreateBuilder;  10 import org.apache.zookeeper.CreateMode;  11  12  13 /**  14  *  15  * @Author:orisun  16  * @Since:2016-4-7  17  * @Version:1.0  18 */  19 public class ServerCluster {  20  21 private static Log logger = LogFactory.getLog(ServerCluster.class);  22 private static final String BASE_PATH = ZkClient.getInstance().getBasePath() + "/cluster";  23 private static ScheduledExecutorService exec = Executors  24  .newSingleThreadScheduledExecutor();  25  26 /**  27  * 向集群上报自己的存在,即把自己的IP写到特定的zk节点(EPHEMERAL节点)上去  28 */  29 public static void reportServer() {  30 String selfIP = NIC.getLocalIP();  31 CuratorFramework zkClient = ZkClient.getInstance().getZkClient();  32 boolean exists = false;  33 try {  34 CreateBuilder cb = zkClient.create();  35 if (zkClient.checkExists().forPath(BASE_PATH) == null) {  36  cb.creatingParentsIfNeeded().forPath(BASE_PATH,  37 new byte[] { 0 });  38  }  39 List<String> children = zkClient.getChildren().forPath(BASE_PATH);  40 if (children != null && children.indexOf(selfIP) >= 0) {  41 exists = true;  42  }  43 if (!exists) {  44 // EPHEMERAL节点,进程终止时zookeeper连接断开,节点自动被删除  45  cb.creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)  46 .forPath(BASE_PATH + "/" + selfIP, new byte[] { 0 });  47 logger.info(selfIP + " add to cluster");  48 } else {  49 // 如果发现cluster上已存在该IP,则5秒后再确认一下  50 logger.info(selfIP + " is already in cluster");  51 Thread.sleep(1000 * 5);  52 children = zkClient.getChildren().forPath(BASE_PATH);  53 exists = false;  54 if (children != null && children.indexOf(selfIP) >= 0) {  55 exists = true;  56  }  57 if (!exists) {  58 // EPHEMERAL节点,进程终止时zookeeper连接断开,节点自动被删除  59  cb.creatingParentsIfNeeded()  60  .withMode(CreateMode.EPHEMERAL)  61 .forPath(BASE_PATH + "/" + selfIP, new byte[] { 0 });  62 logger.info(selfIP + " add to cluster");  63  }  64  }  65 } catch (Exception e) {  66 logger.fatal("report to cluster failed", e);  67  }  68  }  69  70 /**  71  * 向集群上报自己的存在,即把自己的IP写到特定的zk节点(EPHEMERAL节点)上去<br>  72  * 为防止zookeeper会话断开而造成节点被删除,每隔10分钟就去写一次  73 */  74 public static void report() {  75 exec.scheduleAtFixedRate(new Runnable() {  76  @Override  77 public void run() {  78  reportServer();  79  }  80 }, 0, 10, TimeUnit.MINUTES);  81  }  82  83 /**  84  * 获取集群中有多少台机器  85  *  86  * @return  87 */  88 public static int getClusterSize() {  89 int total = 0;  90 List<String> children = null;  91 try {  92 CuratorFramework zkClient = ZkClient.getInstance().getZkClient();  93 children = zkClient.getChildren().forPath(BASE_PATH);  94 } catch (Exception e) {  95 logger.error("get children of " + BASE_PATH + " failed", e);  96  }  97 if (children != null) {  98 total = children.size();  99  } 100 logger.info("cluster size is " + total); 101 return total; 102  } 103 104 /** 105  * 获取自己在集群中的编码(从0开始) 106  * 107  * @return 108 */ 109 public static int getIndexInCluster() { 110 int index = -1; 111 CuratorFramework zkClient = ZkClient.getInstance().getZkClient(); 112 try { 113 List<String> children = zkClient.getChildren().forPath(BASE_PATH); 114 String selfIP = NIC.getLocalIP(); 115 index = children.indexOf(selfIP); 116 } catch (Exception e) { 117 logger.fatal("get cluster info failed", e); 118  } 119 logger.info("this server's index is " + index); 120 return index; 121  } 122 }

View Code







 1 /**  2  *  3  *@Author:orisun  4  *@Since:2016-4-7  5  *@Version:1.0  6 */  7 public class ZkParam {  8  9 private double value; 10 private String path; 11 private int logicid; 12 13 public ZkParam(double defaultValue, String path, int logicid) { 14 this.value = defaultValue; 15 this.path = path; 16 this.logicid = logicid; 17  } 18 19 public double getValue() { 20 return value; 21  } 22 23 public void setValue(double value) { 24 this.value = value; 25  } 26 27 public String getPath() { 28 return path; 29  } 30 31 public void setPath(String path) { 32 this.path = path; 33  } 34 35 public int getLogicid() { 36 return logicid; 37  } 38 39 public void setLogicid(int logicid) { 40 this.logicid = logicid; 41  } 42 }

View Code



 1 import java.lang.reflect.Field;  2 import java.lang.reflect.InvocationTargetException;  3 import java.lang.reflect.Method;  4 import java.util.concurrent.ExecutorService;  5 import java.util.concurrent.Executors;  6  7 import org.apache.commons.logging.Log;  8 import org.apache.commons.logging.LogFactory;  9 import org.apache.curator.framework.CuratorFramework;  10 import org.apache.curator.framework.api.CreateBuilder;  11 import org.apache.curator.framework.recipes.cache.NodeCache;  12 import org.apache.curator.framework.recipes.cache.NodeCacheListener;  13  14 /**  15  *  16  * @Author:orisun  17  * @Since:2016-4-7  18  * @Version:1.0  19 */  20 public abstract class ZkConfig {  21  22 private static Log logger = LogFactory.getLog(ZkConfig.class);  23 private static final String ZK_PARAM_NAME = ZkParam.class  24  .getCanonicalName();  25 private Class<?> zkArgClz = null;  26 private static Method getZkPathMethod = null;  27 private static Method getIdMethod = null;  28 private static Method getValueMethod = null;  29 private ExecutorService exec = null;  30  31 public ZkConfig() {  32 exec = Executors.newCachedThreadPool();  33 try {  34 zkArgClz = Class.forName(ZK_PARAM_NAME);  35 getZkPathMethod = zkArgClz.getMethod("getPath");  36 getIdMethod = zkArgClz.getMethod("getLogicid");  37 getValueMethod = zkArgClz.getMethod("getValue");  38 } catch (Exception e) {  39 logger.fatal("build " + ZK_PARAM_NAME + " failed", e);  40 System.exit(1);  41  }  42  }  43  44 public void updateParam(String filedName, ZkParam newArgument) {  45 try {  46 Method method = this.getClass().getMethod(pareSetName(filedName),  47 ZkParam.class);  48 method.invoke(this, newArgument);  49 logger.info("set " + filedName + " to " + newArgument.getValue());  50 } catch (NoSuchMethodException | SecurityException  51 | IllegalAccessException | IllegalArgumentException  52 | InvocationTargetException e) {  53  logger.error(  54 "zookeeper node is changed, but update system parameter failed",  55  e);  56  }  57  }  58  59 /**  60  * 添加zookeeper监听,参数变化时及时反应到推荐系统中  61 */  62 @SuppressWarnings("resource")  63 private void addListener() {  64 final CuratorFramework zkClient = ZkClient.getInstance().getZkClient();  65 try {  66 Field[] fields = this.getClass().getDeclaredFields();// 父类中的成员获取不到  67 for (final Field field : fields) {  68 field.setAccessible(true);  69 if (field.getType().getCanonicalName().equals(ZK_PARAM_NAME)) {  70 Object zkParamInst = field.get(this);  71 final String path = (String) getZkPathMethod  72  .invoke(zkParamInst);  73 if (zkClient.checkExists().forPath(path) != null) {  74 final int logicid = (int) getIdMethod  75  .invoke(zkParamInst);  76 NodeCache nodeCache = new NodeCache(zkClient, path,  77 false);  78 nodeCache.start(true);  79  nodeCache.getListenable().addListener(  80 new NodeCacheListener() {  81  @Override  82 public void nodeChanged() throws Exception {  83 byte[] brr = zkClient.getData()  84  .forPath(path);  85 double newValue = Double.parseDouble(new String(  86  brr));  87 ZkParam newArgument = new ZkParam(  88  newValue, path, logicid);  89  updateParam(field.getName(),  90  newArgument);  91  }  92  }, exec);  93 logger.info("add listener to " + path);  94 } else {  95 logger.error("will add listner on zookeeper path "  96 + path + ", but it dose not exists");  97  }  98  }  99  } 100 } catch (Exception e) { 101 logger.error("add listener to zookeeper failed", e); 102 SendMail.sendMail(SystemConfig.getValue("mail_subject"), 103 SystemConfig.getValue("mail_receiver"), 104 "add listener to zookeeper failed<br>" + e.getMessage()); 105  } 106  } 107 108 /** 109  * 每天定时任务,从MySQL中读取参数的值及参数对应的zkpath,然后监听该zkpath。<br> 110  * 这是为了防止zookeeper连不上,或watcher机制失效 111 */ 112 public void readFromMysql() { 113 try { 114 final ParamConfigDao dao = new ParamConfigDao(); 115 Field[] fields = this.getClass().getDeclaredFields(); 116 for (Field field : fields) { 117 field.setAccessible(true); 118 if (field.getType().getCanonicalName().equals(ZK_PARAM_NAME)) { 119 Object zkParamInst = field.get(this); 120 String path0 = (String) getZkPathMethod.invoke(zkParamInst); 121 int logicid = (int) getIdMethod.invoke(zkParamInst); 122 ParamConfig param = dao.getByLogicId(logicid); 123 if (param != null) { 124 String path1 = param.getZkpath(); 125 String path = (path1 != null && path1.length() > 0) ? path1 126  : path0; 127 if (param != null) { 128 ZkParam newArgument = new ZkParam(param.getValue(), 129  path, logicid); 130 Method method = this.getClass() 131  .getMethod(pareSetName(field.getName()), 132 ZkParam.class); 133 method.invoke(this, newArgument); 134  } 135 } else { 136 logger.error("have no such param whoese logicid is " 137 + logicid + " im mysql"); 138  } 139  } 140  } 141 } catch (Exception e) { 142 logger.error("read param from mysql failed", e); 143 SendMail.sendMail(SystemConfig.getValue("mail_subject"), 144 SystemConfig.getValue("mail_receiver"), 145 "read param from mysql failed<br>" + e.getMessage()); 146  } 147  addListener(); 148  } 149 150 /** 151  * 把参数写入到zookeeper 152  * 153  * 154 */ 155 public void flushToZookeeper() { 156 try { 157 CuratorFramework zkClient = ZkClient.getInstance().getZkClient(); 158 CreateBuilder cb = zkClient.create(); 159 Field[] fields = this.getClass().getDeclaredFields(); 160 for (final Field field : fields) { 161 field.setAccessible(true); 162 if (field.getType().getCanonicalName().equals(ZK_PARAM_NAME)) { 163 Object zkParamInst = field.get(this); 164 String path = (String) getZkPathMethod.invoke(zkParamInst); 165 // 写之前先保证路径是存在的 166 if (zkClient.checkExists().forPath(path) == null) { 167  cb.creatingParentsIfNeeded().forPath(path, 168 new byte[] { 0 }); 169  } 170 double value = (double) getValueMethod.invoke(zkParamInst); 171  zkClient.setData().forPath(path, 172  String.valueOf(value).getBytes()); 173  } 174  } 175 } catch (Exception e) { 176 logger.error("flush behavior weight to zookeeper failed", e); 177  SendMail.sendMail( 178 SystemConfig.getValue("mail_subject"), 179 SystemConfig.getValue("mail_receiver"), 180 "flush behavior weight to zookeeper failed<br>" 181 + e.getMessage()); 182  } 183  } 184 185 /** 186  * 拼接某属性set 方法 187  * 188  * @param fldname 189  * @return 190 */ 191 public static String pareSetName(String fldname) { 192 if (null == fldname || "".equals(fldname)) { 193 return null; 194  } 195 String pro = "set" + fldname.substring(0, 1).toUpperCase() 196 + fldname.substring(1); 197 return pro; 198  } 199 200 /** 201  * 拼接某属性get 方法 202  * 203  * @param fldname 204  * @return 205 */ 206 public static String pareGetName(String fldname) { 207 if (null == fldname || "".equals(fldname)) { 208 return null; 209  } 210 String pro = "get" + fldname.substring(0, 1).toUpperCase() 211 + fldname.substring(1); 212 return pro; 213  } 214 }

View Code



 1 /**  2  *  3  * @Author:orisun  4  * @Since:2016-4-7  5  * @Version:1.0  6 */  7 public class TalentRecParamConfig extends ZkConfig {  8  9 private static final String BASE_PATH = ZkClient.getInstance().getBasePath() + "/talent_rec"; 10 private static final String MAX_REC_NUM_PATH = BASE_PATH + "/max_rec"; 11 private static final String MIN_TITLE_SIM_PATH = BASE_PATH 12 + "/min_title_sim"; 13 /** 每个职位最多推荐多少个用户 **/ 14 private ZkParam maxRec = new ZkParam(1000, MAX_REC_NUM_PATH, 21); 15 /** 最小标题相似度阈值 **/ 16 private ZkParam mimTitleSim = new ZkParam(0.55, MIN_TITLE_SIM_PATH, 22);// TODO 17 // 该阈值可能还需要提升 18 19 private static volatile TalentRecParamConfig instance = null; 20 21 private TalentRecParamConfig() { 22 super(); 23  } 24 25 /** 26  * 单例 27  * 28  * @return 29 */ 30 public static TalentRecParamConfig getInstance() { 31 if (instance == null) { 32 synchronized (TalentRecParamConfig.class) { 33 if (instance == null) { 34 instance = new TalentRecParamConfig(); 35  } 36  } 37  } 38 return instance; 39  } 40 41 public ZkParam getMaxRec() { 42 return maxRec; 43  } 44 45 public void setMaxRec(ZkParam maxRec) { 46 this.maxRec = maxRec; 47  } 48 49 public ZkParam getMimTitleSim() { 50 return mimTitleSim; 51  } 52 53 public void setMimTitleSim(ZkParam mimTitleSim) { 54 this.mimTitleSim = mimTitleSim; 55  } 56 57 }

View Code


版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。


【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...



  • minipcie转nvme,msata转ngff sata「建议收藏」

    minipcie转nvme,msata转ngff sata「建议收藏」现如今的电脑主板,m.2/ngff接口相当普遍。具有minipcie接口的主板大概只有工控主板和老款的主板的了。并不代表minipcie就会别淘汰了。对于消费用户,minipcie接口就是装个无线网卡。但在某些情况下,并不需要这无线网卡功能,这一接口就作他用了,比如扩展nvme固态盘或者傲腾呢?!nvme固态盘/傲腾是m.2/ngff接口的,接口不一样,不能直接安装。但是可以通过接口转接板将minipcie接口转换成m.2/ngff接口来安装nvme固态盘,NICE!常见的nvme固态盘尺寸

  • 惠普笔记本电脑自动关机是什么原因_台式老电脑经常自动关机怎么办


  • spring boot dubbo配置(上古卷轴5基础整合包)


  • SourceInsight注册码「建议收藏」


  • NVIC库函数


  • Byte数组转byte数组_java object对象转数组

    Byte数组转byte数组_java object对象转数组这里用到了java对象的序列化,即要求要转换成Byte数组的对象必须是可序列化的。java代码如下:/***对象转Byte数组**@paramobj*@return*@throwsException*/publicstaticbyte[]objectToBytes(Objectobj)throwsException{logger.debug(“object


