悲观锁和乐观锁 悲观锁:一开始就认为着里会出现锁的竞争,给自己加一把锁
比如一些人为了防止犯错误,一开始就要等到时机非常成熟的时候才会行动,比如锁一样,他拿到一把锁之后才去执行,抱着一种悲观的态度。
乐观锁:不关心执行的时出现错误,如果出现问题我重新执行一次,不停的尝试
有一些人呢,他就是愿意去尝试,什么都没准备好就开始做了,失败了我重新尝试,抱着一种乐观的态度。
基于 Mysql 乐观锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 import threadingfrom peewee import *from inventory_srv.config import configclass Inventory (Model ): goods = IntegerField(verbose_name="商品id" , unique=True ) stocks = IntegerField(verbose_name="库存数量" , default=0 ) version = IntegerField(verbose_name="版本号" , default=0 ) class Meta : database = config.DB def sell (): goods_list = [(1 , 10 ), (2 , 20 ), (3 , 30 )] with config.DB.atomic() as txn: for goods_id, num in goods_list: while True : goods_inv = Inventory.get(Inventory.id == goods_id) print (f"当前的版本号:{goods_inv.version} " ) print (f"商品{goods_id} 售出 {num} 件" ) import time from random import randint time.sleep(randint(1 , 3 )) if goods_inv.stocks < num: print (f"商品:{goods_id} 库存不足" ) txn.rollback() break else : query = Inventory.update(stocks=Inventory.stocks - num, version=Inventory.version + 1 ).where( Inventory.id goods_id, Inventory.version == goods_inv.version) ok = query.execute() if ok: print ("更新成功" ) break else : print ("更新失败" ) if __name__ == "__main__" : config.DB.create_tables([Inventory]) t1 = threading.Thread(target=sell) t2 = threading.Thread(target=sell) t1.start() t2.start() t1.join() t2.join()
基于 Redis 分布式锁 Python Redis驱动: https://github.com/redis/redis-py
Python 基本的 Redis 分布式锁 当并发很高的时候使用 get();set()
可能会存在同时获取数据的问题,一定要使用 setnx
来保证数据的原子性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 import threadingfrom peewee import *from inventory_srv.config import config, server_configimport redisclass Inventory (Model ): goods = IntegerField(verbose_name="商品id" , unique=True ) stocks = IntegerField(verbose_name="库存数量" , default=0 ) version = IntegerField(verbose_name="版本号" , default=0 ) class Meta : database = config.DB class RedisLock : def __init__ (self, key, uuid=None ): if uuid is None : import uuid uuid = uuid.uuid4() self.id = uuid self.redis_client = redis.Redis(host=server_config.REDIS_HOST, port=server_config.REDIS_PORT) self.key = f"{server_config.REDIS_PREFIX['inventory' ]} {key} " def acquire (self ): while True : if self.redis_client.set (self.key, self.id , nx=True , ex=15 ): break else : import time time.sleep(1 ) def release (self ): id = self.redis_client.get(self.key) print (f"释放{self.key} " ) if id is not self.id : print ("不能删除不属于自己的出锁" ) return self.redis_client.delete(self.key) def sell (): goods_list = [(1 , 10 ), (2 , 20 ), (3 , 30 )] with config.DB.atomic() as txn: for goods_id, num in goods_list: lock = RedisLock(key=goods_id) goods_inv = Inventory.get(Inventory.id == goods_id) print (f"商品{goods_id} 售出 {num} 件" ) import time from random import randint time.sleep(randint(1 , 2 )) if goods_inv.stocks < num: print (f"商品:{goods_id} 库存不足" ) txn.rollback() break else : lock.acquire() query = Inventory.update(stocks=Inventory.stocks - num).where(Inventory.id == goods_id) ok = query.execute() if ok: print (f"商品:{goods_id} 更新成功" ) else : print (f"商品:{goods_id} 更新失败" ) lock.release() if __name__ == "__main__" : config.DB.create_tables([Inventory, InventoryHistory]) t1 = threading.Thread(target=sell) t2 = threading.Thread(target=sell) t3 = threading.Thread(target=sell) t4 = threading.Thread(target=sell) t5 = threading.Thread(target=sell) t1.start() t2.start() t3.start() t4.start() t5.start() t1.join() t2.join() t3.join() t4.join() t5.join()
python-redis-lock https://github.com/ionelmc/python-redis-lock
使用pip 安装或着拷贝代码https://github.com/ionelmc/python-redis-lock/blob/master/src/redis_lock/__init__.py
比较简单就不演示了