Python 中 Redis 库分布式锁简单分析


Python 中 Redis 库分布式锁简单分析文章插图
作者:ayuliao
来源:懒编程
简介我们常会遇到某段逻辑在相同时间段内只希望被单个实例执行 , 而在微服务架构中 , 一个程序可能会存在多个实例 , 此时就需要通过分布式锁来实现串行执行 。
最简单的分布式锁无非就是找到对于多个程序实例而言单一的存在 , 比如MySQL数据只有一个或Redis只有一个 , 此时都可以利用这单一的存在构建一个锁 , 多个程序实例要执行某段逻辑前必须先获得这个锁 , 然后才能执行 。
因为某些原因 , 上班的时候我和同事一起研究了一下Python redis库中分布式锁的实现源码 , 这里简单分享一下 。
通过pip可以安装这个库 。
pip install redis==2.10.6这里以这个库的2.10.6版本为例 , 对它Redis分布式锁源码进行简单的分析 。
代码分析实例化StrictRedis对象后 , 使用其中的lock方法便可获得一个分布式锁 。
首先看一下lock方法对应的源码 。
def lock(self, name, timeout=None, sleep=0.1, blocking_timeout=None,lock_class=None, thread_local=True):if lock_class is None:if self._use_lua_lock is None:# the first time .lock() is called, determine if we can use# Lua by attempting to register the necessary scriptstry:LuaLock.register_scripts(self)self._use_lua_lock = Trueexcept ResponseError:self._use_lua_lock = Falselock_class = self._use_lua_lock and LuaLock or Lockreturn lock_class(self, name, timeout=timeout, sleep=sleep,blocking_timeout=blocking_timeout,thread_local=thread_local)该方法提供了多个参数 , 其中:

  • name用于指定锁名
  • timeout用于指定锁的超时时间
  • sleep用于指定线程睡眠时间 , 线程争夺锁的过程本质就是一个循环 , 每过sleep秒 , 就会尝试去获取锁对象
  • blocking_timeout用于指定阻塞超时时间 , 当多个实例争夺锁时 , 这个时间就是实例等待锁的最长时间
  • lock_class表示使用锁的类对象
  • thread_local表示是否线程安全
方法中最关键的一句代码为 lock_class = self._use_lua_lock and LuaLock or Lock, 确定了lock_class后 , 便实例化该lock_class即可 。
【Python 中 Redis 库分布式锁简单分析】lock_class可以为LuaLock也可为Lock , 经过简单分析 , Lock类才是关键 , LuaLock类继承自Lock , 通过Lua代码实现Redis的一些操作 , 这里着重看Lock类 。
首先看到该类的 __init__ 方法 。
class Lock(object):def __init__(self, redis, name, timeout=None, sleep=0.1,blocking=True, blocking_timeout=None, thread_local=True):self.redis = redisself.name = nameself.timeout = timeoutself.sleep = sleepself.blocking = blockingself.blocking_timeout = blocking_timeoutself.thread_local = bool(thread_local)self.local = threading.local() if self.thread_local else dummy()self.local.token = Noneif self.timeout and self.sleep > self.timeout:raise LockError("'sleep' must be less than 'timeout'")__init__ 方法初始化不同的属性 , 其中self.local为线程的本地字段 , 用于存储该线程特有的数据 , 不与其他线程进行共享 。
此外 , 在 __init__ 方法中对timeout与sleep进行的判断 , 如果线程等待锁时的睡眠时间大于锁的超时时间 , 则直接返回错误 。
接着重点看Lock类中的acquire方法 , 该方法代码如下 。
import time as mod_timeclass Lock(object):def acquire(self, blocking=None, blocking_timeout=None):sleep = self.sleeptoken = b(uuid.uuid1().hex)if blocking is None:blocking = self.blockingif blocking_timeout is None:blocking_timeout = self.blocking_timeoutstop_trying_at = Noneif blocking_timeout is not None:stop_trying_at = mod_time.time() + blocking_timeoutwhile 1:if self.do_acquire(token):self.local.token = tokenreturn Trueif not blocking:return Falseif stop_trying_at is not None and mod_time.time() > stop_trying_at:return Falsemod_time.sleep(sleep)acquire方法的主逻辑就是一个死循环 , 在死循环中调用do_acquire方法获取Redis分布式锁 , 如果成功获得锁 , 则将token存储到当前线程的local对象中 , 如果没有获得 , 则判断blocking , 如果blocking为Flase , 则不再阻塞 , 直接返回结果 , 反之 , 则判断当前时间是否超过blocking_timeout , 超过 , 同样返回False , 反之 , 通过sleep方法让当前线程睡眠sleep秒 。
进一步分析do_acquire方法 , 代码如下:
def do_acquire(self, token):if self.redis.setnx(self.name, token):if self.timeout:# convert to millisecondstimeout = int(self.timeout * 1000) # 转成毫秒self.redis.pexpire(self.name, timeout)return Truereturn Falsedo_acquire方法中 , 一开始通过redis的setnx方法将name对着作为key , token作为value , setnx方法只有在key不存的情况下 , 才能正常的将value存入Redis中 , 若key依存 , 该方法不做任何操作 , 此时就相当于没有获取到锁 。