开发者

redis和redission分布式锁原理及区别说明

目录
  • Redis和redission分布式锁原理及区别
    • 1、有的同伴想到了synchronized关键字锁
    • 2、有的小伙伴可能想到了乐观锁
    • 3、使用redis的分布式锁
    • 4、使用redission的分布式锁
  • 总结

    redis和redission分布式锁原理及区别

    我最近做租车项目,在处理分布式时用到分布式锁,我发现很多同事都在网上找分布式锁的资料,但是看的资料都不是很全,所以在这里我谈谈自己的分布式锁理解。

    结合我的其中某一业务需求:多个用户在同一个区域内发现只有一辆可租的车,最终结果肯定只有一位用户租车成功,这就产生了多线程(多个用户)抢同一资源的问题。

    1、有的同伴想到了synchronized关键字锁

    暂且抛开性能问题,项目为了高可用,都会做集群部署,那么synchronized就失去了加锁的意义,这里多嘴解释一下:

    redis和redission分布式锁原理及区别说明

    2、有的小伙伴可能想到了乐观锁

    没错!!乐观锁可以解决的我的问题,但是在高并发的场景,频繁的操作数据库,数据库的资源是很珍贵的,并且还存在性能的问题。但是我这里简单说下乐观锁的使用:

    • 我们在车的表中添加一个字段:version(int类型)(建议使用这个名称,这样别人看到就会直觉这是乐观锁字段,也可以使用别的名称)
    • 查询出该车的数据,数据中就有version字段,假如version=1
    select * from u_car where car_id = 10;
    • 修改该车的状态为锁定
    update u_car set status = 2,version = version +1 where car_id = 10 and version = 1

    在修改的时候将version作为参数,如果其他用户锁车,那么version已经发生变化(version = version +1),所以version = 1不成立,修改失败

    乐观锁不是本次的终点,但还是简单说下;

    3、使用redis的分布式锁

    	public boolean lock(String key, V v, int expireTime){
               //获取锁
               //在redis早期版本中,设置key和key的存活时间是分开的,设置key成功,但是设置存活时间时服务宕机,那么你的key就永远不会过期,有BUG
               //后来redis将加锁和设置时间用同一个命令
               //这里是重点,redis.setNx(key,value,time)方法是原子性的,设置key成功说明锁车成功,如果失败说明该车被别人租了
             boolean b = false;
             try {
             	b = redis.setNx(key, v, expireTime);
             } catch (Exception e) {
            	log.error(e.getMessage(), e);
        	 }
        	 return b;
        }
        publphpic boolean unlock(String key){
            return redwww.devze.comis.delete(key);
        }
    }
    

    但是这样写还是存在BUG的,我的key设置了加锁时间为5秒,但是我的业务逻辑5秒还没有执行完成,key过期了,那么其他用户执行redis.setNx(key, v, expireTime)时就成功了,将该车锁定,又产生了抢资源;我们想一下,如果我能够在业务逻辑没有执行完的时候,让锁过期后能够延长锁的时间,是不是就解决了上面的BUG;

    实现这个锁的延长,非要自己动手的话就得另启一个线程来监听我们的业务线程,每隔1秒监测当前业务线程是否执行完成,如果没有就获取key的存活时间,时间小于一个阈值时,就自动给key设置N秒;当然,我们可以不用自己动手,redission已经帮我们实现key的时间时间过期问题;

    4、使用redission的分布式锁

    //引入依赖
    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
            <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson-spring-boot-starter</artifactId>
                <version>3.10.6</version>
            </dependency>
    

    redisson支持单点、集群等模式,这里选择单点的。

    • application.yml配置好redis的连接:
    spring:  
        redis:
            host: 127.0.0.1
            port: 6379
            password: 
    
    • 配置redisson的客户端bean
    @Configuration
    public class RedisConfig {
        @Value("${spring.redis.host}")
        private String host;
     
        @Bean(name = {"redisTemplate", "stringRedisTemplate"})
        public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {
            StringRedisTemplate redisTemplate = new StringRedisTemplate();
            redisTemplate.setConnectionFactory(factory);
            return redisTemplate;
        }
     
        @Bean
        public Redisson redisson() {
            Config config = new Config();
            config.useSingleServer().setAddress("redis://" + host + ":6379");
            return (Redisson) Redisson.create(config);
        }
     
    }
    
    • 加锁使用
    private Logger log = LoggerFactory.getLogger(getClass());
    @Resource
    private Redisson redisson;
    //加锁
    public Boolean lock(String key,long waitTime,long leaseTime){
    	Boolean  b = false;
    	try {
            RLock rLock = redisson.getLock(key);
            //说下参数 waitTime:锁的存活时间 leaseTime:锁的延长时间 后面的参数是单位
            b = rLock.tryLock(waitTime,leaseTime,TimeUnit.SECONDS);
          } catch (Exception e) {
             log.error(e.getMessage(), e);
          } 
        }
        return b;
    }
    //释放锁
    public void unlock(String key){
    	try {
    		RLock rLock = redisson.getLock(key);
    		if(null!=lock){
    			lock.unlock();
    			lock.forceUnlock();
    			fileLog.info("unlock succesed");编程客栈
        	}
        } catch (Exception e) {
            fileLog.error(e.getMessage(), e);
        }
    }
    
    • 带大家看下tryLock方法的实现源码:
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
            long time = unit.toMillis(waitTime);
            long current = System.currentTimeMillis();
            long threadId = Thread.currentThread().getId();
            //尝试获取锁,如果没取到锁,则获取锁的剩余超时时间
            Long ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return true;
            }
            //如果waitTime已经超时了,就返回false
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
            
            current = System.currentTimeMillis();
            RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
            if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
                if (!subscribeFuture.cancel(false)) {
                    subscribeFuture.onComplete((res, e) -> {
                        if (e == null) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    });
                }
                acquireFailed(threadId);
                return false;
            }
     
            try {
                time -= System.currentTimeMillis() - current;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
                //进入死循环,反复去调用tryAcquire尝试获取锁,ttl为null时就是别的线程已经unlock了
                while (true) {
                    long currentTime = System.currentTimeMillis();
                    ttl = tryAcquire(leaseTime, unit, threadId);
                    // lock acquired
                    if (ttl == null) {
                        return true;
                    }
     
                    time -= System.currentTimeMillis() - currentTime;
                    if (time <= 0) {
                    编程客栈    acquireFailed(threadId);
                        return false;
                    }
     
                    // waiting for message
     编程客栈               currentTime = System.currentTimeMillis();
                    if (ttl >= 0 && ttl < time) {
                        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } else {
                        getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                    }
     
                    time -= System.currentTimeMillis() - currentTime;
                    if (time <= 0) {
                        acquireFailed(threadId);
                        return false;
                    }
                }
            } finally {
                unsubscribe(subscribeFuture, threadId);
            }
    //        return get(tryLockAsync(waitTime, leaseTime, unit));
        }
    

    可以看到,其中主要的逻辑就是尝试加锁,成功了就返回true,失败了就进入死循环反复去尝试加锁。中途还有一些超时的判断。逻辑还是比较简单的。

    • 再看看tryAcquire方法

    redis和redission分布式锁原理及区别说明

    • 这个方法的调用栈也是比较多,之后会进入下面这个方法

    redis和redission分布式锁原理及区别说明

    上面的Lua(俗称胶水语言)脚本比较重要,主要是为了执行命令的原子性解释一下:

    • KEYS[1]代表你的key
    • ARGV[1]代表你的key的存活时间,默认存活30秒
    • ARGV[2]代表的是请求加锁的客户端ID,后面的1则理解为加锁的次数,简单理解就是 如果该客户端多次对key加锁时,就会执行hincrby原子加1命令

    第一段if就是判断你的key是否存在,如果不存在,就执行redis call(hset key ARGV[2],1)加锁和设置redis call(pexpire key ARGV[1])存活时间;

    当第二个客户来加锁时,第一个if判断已存在key,就执行第二个if判断key的hash是否存在客户端2的ID,很明显不是;

    则进入到最后的return返回该key的剩余存活时间

    当加锁成功后会在后台启动一个watch dog(看门狗)线程,key的默认存活时间为30秒,则watch dog每隔10秒钟就会检查一下客户端1是否还持有该锁,如果持有,就会不断的延长锁key的存活时间

    所以这里建议大家在设置key的存活时间时,最好大于10秒,延续时间也大于等于10秒

    所以,总体流程应该是这样的。

    redis和redission分布式锁原理及区别说明

    总结

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新数据库

    数据库排行榜