大家好,又见面了,我是你们的朋友全栈君。
Redis锁的实现:
由于Redis是单进程的,可以简单用setnx这个命令进行加锁操作,谁能操作成功,谁就可以获得锁。简单的代码如下:
def acquire_lock():
# identifier: 唯一标识客户端
# lockname 锁名字
# redis 客户端连接
if redis.setnx(lockname, identifier):
return True
return False
这里有一个问题,就是如果客户端在获得锁的时候崩溃了,服务器就无法再把锁分配给其他客户端使用了,为了解决这个问题,我们可以利用redis的超时特性,给锁加上超时时间
def acquire_lock():
# identifier: 唯一标识客户端
# lockname 锁名字
# redis 客户端连接
# timeout 超时时间
if redis.setnx(lockname, identifier):
redis.expire(lockname, timeout)
return True
elif not redis.ttl(lockname):
redis.expire(lockname, timeout)
return False
return False
可以这样认为,多个客户端同时设置过期时间也是差别不大的,我们在发现锁已经存在并且没有超时限制时,给锁加上超时限制,这样可以在其他客户端获得锁并未设置超时时间崩溃了,也能在过期时间到了让其他客户端获取到锁。最新官方文档支持用set命令指定超时和nx特性,
def acquire_lock():
# identifier: 唯一标识客户端
# lockname 锁名字
# redis 客户端连接
# timeout 超时时间
if redis.set(lockname, identifier, nx=True, ex=timeout):
return True
return False
解锁操作,直接执行一段lua脚本
def release_lock():
# identifier: 唯一标识客户端
# lockname 锁名字
# redis 客户端连接
script = “””
if redis.call(‘GET’, KEYS[1]) == ARGV[1] then
return redis.call(‘DEL’, KEYS[1])
else
return 0
“””
return redis.eval(script, lockname, identifier)
使用lua脚本可以原子的操作解锁过程,这里需要注意点,eval的key是要传的,这样代码也可以在redis集群中使用,否则redis不知道lua脚本应该在哪一个槽进行执行,具体可以看官方的文档
另外一种经常使用的计数信号量的使用,最简单的方式是用一个zset,通过客户端设置过期时间
def acquire_semaphore():
# identifier: 唯一标识客户端
# semname 信号量集合名
# redis 客户端连接
# timeout 过期时间
# limit 信号量的计数
now = time.time()
pipe = redis.pipeline(True)
pipe.zremrangebyscore(semname, ‘-inf’, now-timeout)
pipe.zadd(semname, identifier, now)
pipe.zrank(semname, identifier)
if pipe.execute()[-1] < limit:
return True
redis.zrem(semname, identifier)
return False
这里有一个问题,就是如果某一个客户端的时间比较快,那么这个较快的客户端是有可能在信号量已经达到限制的时候获得新号量的。
比如A客户端比其他客户端快50ms,那么当最后一个信号量被取走的同时,如果这时候A请求信号量,根据代码是有可以获得信号量的,不是公平的
为了解决公平的问题,可以引入多一个有序集合,记录获取信号量的计数
def acquire_fair_semaphore():
# identifier: 唯一标识客户端
# semname 信号量集合名
# redis 客户端连接
# timeout 过期时间
# limit 信号量的计数
czset = semname + “:owner”
ctr = semname + “:counter”
now = time.time()
pipe = redis.pipeline(True)
pipe.zramrangebyscore(semname, ‘-inf’, now-timeout)
pipe.zinterstore(czset, {czset:1, semname: 0})
pipe.incr(ctr)
counter = pipe.execute()[-1]
pipe.zadd(semname, identifier, now)
pipe.zadd(czset, identifier, counter)
pipe.zrank(czset, identifier)
if pipe.execute()[-1] < limit:
return True
pipe.zrem(semname, identifier)
pipe.zrem(czset, identifier)
pipe.execute()
return False
这个看似没啥问题的代码还是有一点问题,假设剩下最后一个信号量,AB两个客户端依次执行incr获得counter = 10,11
这个时候如果B先执行了zrank,那么会导致A和B都同时获得了最后一个信号量,如果场景不在乎这点小问题也没问题,不然可以在这段操作前,先获取前面的锁,再进行操作。
上面介绍的锁其实有一个假设是Redis服务器没有挂,如果Redis服务器挂了,有主从切换的话会切换到从服务器,但是从服务器并不一定与主服务器数据完全一致,取决于同步的方式,假设A获得锁之后服务器挂了,这个信息没有记录到从服务器中,从服务器起来的时候是无锁状态的,有可能会造成两个客户端同时获取了锁,一种解决方法是如果发生主从切换的话,暂停新启动的服务器使用锁的时间,超过过期时间即可。
基于Redis实现的一种分布式锁,RedLock算法
客户端以当前时间毫秒级向多个独立的Redis实例请求锁,超过实例总数(N) N/2+1的客户端获取到这把锁,而释放锁的操作只需要在所有的实例执行释放锁操作。
具体可以参考官方文档:https://redis.io/topics/distlock
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/149192.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...