Python 多线程之 Redis 分布式锁


前言


在很多互联网产品应用中,有些场景需要加锁处理,例如:双11秒杀,全局递增ID,楼层生成等等。

大部分的解决方案是基于 DB 实现的,Redis 为单进程单线程模式,采用队列模式将并发访问变成串行访问,且多客户端对 Redis 的连接并不存在竞争关系。

其次 Redis 提供一些命令SETNX,GETSET,可以方便实现分布式锁机制。

Python代码实现

import time
import redis
import threading
 
#使用连接池方式连接redis
redis_pool=redis.ConnectionPool(host="127.0.0.1",port=6379)
redis_conn=redis.Redis(connection_pool=redis_pool)
 
#定义redis类
class RedisLock():
    def __init__(self):
        self.redis_conn = redis_conn
        print("init the redis connection")
 
    #获取锁
    def get_lock(self,name,value):
        while True:
            # set(name, value, ex=None, px=None, nx=False, xx=False)
            # nx - 如果设置为True,则只有name不存在时,当前set操作才执行
            # ex - 过期时间(秒)
            result=self.redis_conn.set(name,value,nx=True,ex=3)
            # print(result)
            if(result):
                # 获取到result后就终止while循环
                break
            # time.sleep(0.5)
 
    #释放锁
    def release_lock(self,name,value):
        #获取原name key对应的value
        old_value = redis_conn.get(name)
        print("--------------------------------the key =%s ;the name=%s"%(name,old_value))
        #判断原value 与 要释放的值是否相同
        if(old_value == value):
            #相同就从redis里面释放
            self.redis_conn.delete(name)
            print("release the lock is success")
 
def redis_lock_test(lock,name,value):
    try:
        print("% --start to work"%name)
        print("% --ready get the lock and execute lock operation"%name)
        lock.get_lock(name,value)#这里是获取锁操作
        print("% --get the lock and continue to operation"%name)
    except Exception as e:
        print("the exception is:%s"%str(e))
    finally:
        print("% --ready release the lock"%name)
        lock.release_lock(name,value)#最终必须释放锁操作
        print("% --release the lock is over"%name)
 
if __name__ == '__main__':
    start_time=time.time()
    rs=RedisLock()
    tasks=[]
    for i in range(1,3):
        # 创建线程
        t = threading.Thread(target=redis_lock_test(rs,"task-name%"%i,"lock%d"%i))
        # 将创建的线程放入列表
        tasks.append(t)
        # 启动线程
        t.start()
    #这里没有设置守护线程且没有设置join函数的timeout参数时,主线程将会一直等待,直到子线程全部结束,主线程才结束,程序退出
    [t.join() for t in tasks]
    print("total waster time is:",time.time()-start_time)

注意:
1. thread.setDaemon(True) 当设置守护线程 join 函数的参数 timeout=2 时,主线程将会等待多个子线程 timeout 的累加和这样的一段时间,时间一到,主线程结束,杀死未执行完的子线程,程序退出。

2. 当没有设置守护线程且 join 函数的参数 timeout=2 时,主线程将会等待多个子线程 timeout 的累加和这样的一段时间,时间一到主线程结束,但是并没有杀死子线程,子线程依然可以继续执行,直到子线程全部结束,程序退出。

欢迎关注【无量测试之道】公众号,回复【领取资源】

Python+Unittest框架API自动化、

Python+Unittest框架API自动化、

Python+Pytest框架API自动化、

Python+Pandas+Pyecharts大数据分析、

Python+Selenium框架Web的UI自动化、

Python+Appium框架APP的UI自动化、

Python编程学习资源干货、

资源和代码 免费送啦~
文章下方有公众号二维码,可直接微信扫一扫关注即可。

备注:我的个人公众号已正式开通,致力于IT互联网技术的分享。

包含:数据分析、大数据测试、机器学习、测试开发、API接口自动化、测试运维、UI自动化、性能测试、代码检测、编程技术等。

微信搜索公众号:“无量测试之道”,或扫描下方二维码:

添加关注,让我们一起共同成长!