Redis分布式锁

Redis分布式锁

Redis之分布式锁的实现方案 – 如何优雅地实现分布式锁(JAVA)

博客地址 https://blog.piaoruiqing.com/blog/2019/05/19/redis分布式锁

关键词

  • 分布式锁: 是控制分布式系统之间同步访问共享资源的一种方式。
  • spring-data-redis: Spring针对redis的封装, 配置简单, 提供了与Redis存储交互的抽象封装, 十分优雅, 也极具扩展性, 推荐读一读源码
  • Lua: Lua 是一种轻量小巧的脚本语言, 可在redis执行.

前言

本文阐述了Redis分布式锁的一种简单JAVA实现及优化进阶, 实现了自动解锁、自定义异常、重试、注解锁等功能, 尝试用更优雅简洁的代码完成分布式锁.

需求

  • 互斥性: 在分布式系统环境下, 一个锁只能被一个线程持有.
  • 高可用: 不会发生死锁、即使客户端崩溃也可超时释放锁.
  • 非阻塞: 获取锁失败即返回.

方案

Redis具有极高的性能, 且其命令对分布式锁支持友好, 借助SET命令即可实现加锁处理.

SET

  • EX seconds — Set the specified expire time, in seconds.
  • PX milliseconds — Set the specified expire time, in milliseconds.
  • NX — Only set the key if it does not already exist.
  • XX — Only set the key if it already exist.

实现

简单实现

做法为set if not exist(如果不存在则赋值), redis命令为原子操作, 所以单独使用set命令时不用担心并发导致异常.

具体代码实现如下: (spring-data-redis:2.1.6)

依赖引入

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.1.4.RELEASE</version>
</dependency>

配置RedisTemplate

@Bean
@ConditionalOnMissingBean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {

    StringRedisSerializer keySerializer = new StringRedisSerializer();
    RedisSerializer<?> serializer = new StringRedisSerializer();
    StringRedisTemplate template = new StringRedisTemplate();
    template.setConnectionFactory(factory);
    template.setKeySerializer(keySerializer);
    template.setHashKeySerializer(keySerializer);
    template.setValueSerializer(serializer);
    template.setHashValueSerializer(serializer);
    template.afterPropertiesSet();
    return template;
}

简单的分布式锁实现

/**
 * try lock
 * @author piaoruiqing
 * 
 * @param key       lock key
 * @param value     value
 * @param timeout   timeout
 * @param unit      time unit
 * @return 
 */
public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) {      

    return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit);
}

以上代码即完成了一个简单的分布式锁功能:

其中redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit); 即为执行redis命令:

redis> set dlock:test-try-lock a EX 10 NX
OK
redis> set dlock:test-try-lock a EX 10 NX
null

早期版本spring-data-redis分布式锁实现及注意事项

方法Boolean setIfAbsent(K key, V value, long timeout, TimeUnit unit);是在2.1版本中新增的, 早期版本中setIfAbsent无法同时指定过期时间, 若先使用setIfAbsent再设置key的过期时间, 会存在产生死锁的风险, 故旧版本中需要使用另外的写法进行实现. 以spring-data-redis:1.8.20为例

/**
 * try lock
 * @author piaoruiqing
 * 
 * @param key       lock key
 * @param value     value
 * @param timeout   timeout
 * @param unit      time unit
 * @return 
 */
public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) {

    return redisTemplate.execute(new RedisCallback<Boolean>() {
        @Override
        public Boolean doInRedis(RedisConnection connection) throws DataAccessException {

            JedisCommands commands = (JedisCommands)connection.getNativeConnection();
            String result = commands.set(key, value, "NX", "PX", unit.toMillis(timeout));

            return "OK".equals(result);
        }
    });
}

spring-data-redis:1.8.20默认redis客户端为jedis, 可通过getNativeConnection直接调用jedis方法进行操作. 新旧版本实现方式最终效果相同.

优化进阶

基于AOP实现分布式锁注解工具 – 不仅能用, 而且好用

优化一 (自动解锁及重试)

自动解锁、重试: 上一节针对分布式锁的简单实现可满足基本需求, 但仍有较多可优化改进之处, 本小节将针对分布式锁自动解锁及重试进行优化

分布式锁抽象类

实现AutoCloseable接口, 可使用try-with-resource方便地完成自动解锁.

/**
 * distributed lock
 * @author piaoruiqing
 *
 * @since JDK 1.8
 */
abstract public class DistributedLock implements AutoCloseable {

    private final Logger LOGGER = LoggerFactory.getLogger(getClass());

    /**
     * release lock
     * @author piaoruiqing
     */
    abstract public void release();

    /*
     * (non-Javadoc)
     * @see java.lang.AutoCloseable#close()
     */
    @Override
    public void close() throws Exception {

        LOGGER.debug("distributed lock close , {}", this.toString());

        this.unlock();
    }
}

封装Redis分布式锁

RedisDistributedLock是Redis分布式锁的抽象, 继承了DistributedLock并实现了unlock接口.

/**
 * redis distributed lock
 *
 * @author piaoruiqing
 * @date: 2019/01/12 23:20
 *
 * @since JDK 1.8
 */
public class RedisDistributedLock extends DistributedLock {

    private RedisOperations<String, String> operations;
    private String key;
    private String value;

    private static final String COMPARE_AND_DELETE =        // (一)
        "if redis.call('get',KEYS[1]) == ARGV[1]\n" +
        "then\n" +
        "    return redis.call('del',KEYS[1])\n" +
        "else\n" +
        "    return 0\n" +
        "end";

    /**
     * @param operations
     * @param key
     * @param value
     */
    public RedisDistributedLock(RedisOperations<String, String> operations, String key, String value) {
        this.operations = operations;
        this.key = key;
        this.value = value;
    }
    /*
     * (non-Javadoc)
     * @see com.piaoruiqing.demo.distributed.lock.DistributedLock#release()
     */
    @Override
    public void release() {                                 // (二)
        List<String> keys = Collections.singletonList(key);
        operations.execute(new DefaultRedisScript<String>(COMPARE_AND_DELETE), keys, value);
    }
    /*
     * (non-Javadoc)
     * @see java.lang.Object#toString()
     */
    @Override
    public String toString() {
        return "RedisDistributedLock [key=" + key + ", value=" + value + "]";
    }
}
  • (一): 通过Lua脚本进行解锁, 使对比锁的值+删除成为原子操作, 确保解锁操作的正确性. 简单来说就是防止删了别人的锁.
    例如: 线程A方法未执行完毕时锁超时了, 随后B线程也获取到了该锁(key相同), 但此时如果A线程方法执行完毕尝试解锁, 如果不比对value, 那么A将删掉B的锁, 这时候C线程又能加锁, 业务将产生更严重的混乱.(不要过分依赖分布式锁, 在数据一致性要求较高的情况下, 数据库层面也要进行一定的处理, 例如唯一键约束、事务等来确保数据的正确)
  • (二): 使用RedisOperations执行Lua脚本进行解锁操作.
  • 可参阅redis官方文档

加锁方法实现

/**
 * @author piaoruiqing
 * @param key           lock key
 * @param timeout       timeout
 * @param retries       number of retries
 * @param waitingTime   retry interval
 * @return
 * @throws InterruptedException
 */
public DistributedLock acquire(String key, long timeout, int retries, long waitingTime) throws InterruptedException {
    final String value 
        = RandomStringUtils.randomAlphanumeric(4) + System.currentTimeMillis(); // (一)
    do {
        Boolean result 
            = stringRedisTemplate.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.MILLISECONDS); // (二)
        if (result) {
            return new RedisDistributedLock(stringRedisTemplate, key, value);
        }
        if (retries > NumberUtils.INTEGER_ZERO) {
            TimeUnit.MILLISECONDS.sleep(waitingTime);
        }
        if(Thread.currentThread().isInterrupted()){
            break;
        }
    } while (retries-- > NumberUtils.INTEGER_ZERO);

    return null;
}
  • (一): 锁值要保证唯一, 使用4位随机字符串+时间戳基本可满足需求
    注: UUID.randomUUID()在高并发情况下性能不佳.
  • (二): 尝试加锁, 代码中是2.1版本的做法, 早起版本参考上一节的实现.

此代码已经可以满足自动解锁和重试的需求了, 使用方法:

// 根据key加锁, 超时时间10000ms, 重试2次, 重试间隔500ms
try(DistributedLock lock = redisLockService.acquire(key, 10000, 2, 500);){
    // do something
}

但还可以再优雅一点, 将模板代码封装起来, 可支持Lambda表达式:

/**
 * lock handler
 * @author piaoruiqing
 *
 * @since JDK 1.8
 */
@FunctionalInterface                // (一)
public interface LockHandler<T> {

    /**
     * the logic you want to execute
     * 
     * @author piaoruiqing
     *
     * @return
     * @throws Throwable
     */
     T handle() throws Throwable;   // (二)
}
  • (一): 定义函数式接口, 将业务逻辑放入Lambda表达式使代码更加简洁.
  • (二): 业务中的异常不建议在分布式锁中处理, 直接抛出来更合理.

使用LockHandler完成加锁的实现:

public <T> T tryLock(String key, LockHandler<T> handler, long timeout, boolean autoUnlock, int retries, long waitingTime) throws Throwable {
    try (DistributedLock lock = this.acquire(key, timeout, retries, waitingTime);) {
        if (lock != null) {
            LOGGER.debug("get lock success, key: {}", key);
            return handler.handle();
        }
        LOGGER.debug("get lock fail, key: {}", key);
        return null;
    }
}

此时可以通过比较优雅的方式使用分布式锁来完成编码:

@Test
public void testTryLock() throws Throwable {
    final String key = "dlock:test-try-lock";
    AnyObject anyObject = redisLockService.tryLock(key, () -> {
        // do something
        return new AnyObject();
    }, 10000, true, 0, 0);
}
[版权声明]
本文发布于朴瑞卿的博客, 允许非商业用途转载, 但转载必须保留原作者朴瑞卿 及链接:https://blog.piaoruiqing.com.
如有授权方面的协商或合作, 请联系邮箱: piaoruiqing@gmail.com.

优化二 (自定义异常)

自定义异常: 前文中针对分布式锁的封装可满足多数业务场景, 但是考虑这样一种情况, 如果业务本身会返回NULL当前的实现方式可能会存在错误的处理, 因为获取锁失败也会返回NULL. 避免返回NULL固然是一种解决方式, 但无法满足所有的场景, 此时支持自定义异常或许是个不错的选择.

实现起来很容易, 在原代码的基础之上增加onFailure参数, 如果锁为空直接抛出异常即可.

加锁方法实现

public <T> T tryLock(String key, LockHandler<T> handler, long timeout, boolean autoUnlock, int retries, long waitingTime, Class<? extends RuntimeException> onFailure) throws Throwable {   // (一)
    try (DistributedLock lock = this.getLock(key, timeout, retries, waitingTime);) {
        if (lock != null) {
            LOGGER.debug("get lock success, key: {}", key);
            return handler.handle();
        }
        LOGGER.debug("get lock fail, key: {}", key);
        if (null != onFailure) {
            throw onFailure.newInstance();  // (二)
        }
        return null;
    }
}
  • (一): Class<? extends RuntimeException>限定onFailure必须是RuntimeException或其子类. 笔者认为使用RuntimeException在语义上更容易理解. 如有需要使用其他异常也未尝不可(如获取锁失败需要统一处理等情况).
  • (二): 反射

优化三 (优雅地使用注解)

结合APO优雅地使用注解完成分布式锁:

定义注解

为了减小篇幅折叠部分注释

/**
 * distributed lock
 * @author piaoruiqing
 * @date: 2019/01/12 23:15
 *
 * @since JDK 1.8
 */
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface DistributedLockable {

    /** timeout of the lock */
    long timeout() default 5L;

    /** time unit */
    TimeUnit unit() default TimeUnit.MILLISECONDS;

    /** number of retries */
    int retries() default 0;

    /** interval of each retry */
    long waitingTime() default 0L;

    /** key prefix */
    String prefix() default "";

    /** parameters that construct a key */
    String[] argNames() default {};

    /** construct a key with parameters */
    boolean argsAssociated() default true;

    /** whether unlock when completed */
    boolean autoUnlock() default true;

    /** throw an runtime exception while fail to get lock */
    Class<? extends RuntimeException> onFailure() default NoException.class;

    /** no exception */
    public static final class NoException extends RuntimeException {

        private static final long serialVersionUID = -7821936618527445658L;

    }
}
  • timeout: 超时时间
  • unit: 时间单位
  • retries: 重试次数
  • waitingTime: 重试间隔时间
  • prefix: key前缀, 默认为包名+类名+方法名
  • argNames: 组成key的参数

注解可使用在方法上, 需要注意的是, 本文注解通过spring AOP实现, 故对象内部方法间调用将无效.

切面实现

/**
 * distributed lock aspect
 * @author piaoruiqing
 * @date: 2019/02/02 22:35
 *
 * @since JDK 1.8
 */
@Aspect
@Order(10)  // (一)
public class DistributedLockableAspect implements KeyGenerator {    // (二)

    private final Logger LOGGER = LoggerFactory.getLogger(getClass());
    @Resource
    private RedisLockClient redisLockClient;
    /**
     * {@link DistributedLockable}
     * @author piaoruiqing
     */
    @Pointcut(value = "execution(* *(..)) && @annotation(com.github.piaoruiqing.dlock.annotation.DistributedLockable)")
    public void distributedLockable() {}

    /**
     * @author piaoruiqing
     *
     * @param joinPoint
     * @param lockable
     * @return
     * @throws Throwable
     */
    @Around(value = "distributedLockable() && @annotation(lockable)")
    public Object around(ProceedingJoinPoint joinPoint, DistributedLockable lockable) throws Throwable {
        long start = System.nanoTime();
        final String key = this.generate(joinPoint, lockable.prefix(), lockable.argNames(), lockable.argsAssociated()).toString();
        Object result = redisLockClient.tryLock(
            key, () -> {
                return joinPoint.proceed();
            }, 
            lockable.unit().toMillis(lockable.timeout()), lockable.autoUnlock(), 
            lockable.retries(), lockable.unit().toMillis(lockable.waitingTime()),
            lockable.onFailure()
        );
        long end = System.nanoTime();
        LOGGER.debug("distributed lockable cost: {} ns", end - start);
        return result;
    }
}
  • (一): 切面优先级
  • (二): KeyGenerator为自定义的key生成策略, 使用 prefix+argName+arg作为key, 具体实现见源码.

此时可以通过注解的方式使用分布式锁, 这种方式对代码入侵较小, 且简洁.

@DistributedLockable(
    argNames = {"anyObject.id", "anyObject.name", "param1"},
    timeout = 20, unit = TimeUnit.SECONDS, 
    onFailure = RuntimeException.class
    )
public Long distributedLockableOnFaiFailure(AnyObject anyObject, String param1, Object param2, Long timeout) {
    try {
        TimeUnit.SECONDS.sleep(timeout);
        LOGGER.info("distributed-lockable: " + System.nanoTime());
    } catch (InterruptedException e) {
    }
    return System.nanoTime();
}

扩展

分布式锁的实现有多种方式, 可根据实际场景和需求选择不同的介质进行实现:

  • Redis: 性能高, 对分布式锁支持友好, 实现简单, 多数场景下表现较好.
  • Zookeeper: 可靠性较高, 对分布式锁支持友好, 实现较复杂但有现成的实现可以使用.
  • 数据库: 实现简单, 可使用乐观锁/悲观锁实现, 性能一般, 高并发场景下不推荐

结语

本文阐述了Redis分布式锁的JAVA实现, 完成了自动解锁、自定义异常、重试、注解锁等功能, 源码见地址.

本实现还有诸多可以优化之处, 如:

  • 重入锁的实现
  • 优化重试策略为订阅Redis事件: 订阅Redis事件可以进一步优化锁的性能, 可通过wait+notifyAll来替代文中的sleep.

篇幅有限, 后续再行阐述.

参考文献

0 0 投票数
文章评分
订阅评论
提醒
guest
2 评论
最旧
最新 最多投票
内联反馈
查看所有评论
allen
allen
5 年 前

拜读了您的代码,受益良多.
其中执行Lua脚本 应该修改为:
@Override
public void release() {
List keys = Collections.singletonList(key);
DefaultRedisScript redisScript = new DefaultRedisScript();
redisScript.setScriptText(COMPARE_AND_DELETE);
redisScript.setResultType(Boolean.class);
operations.execute(redisScript, keys,value);
}

否则会报
org.springframework.data.redis.RedisSystemException: Redis exception; nested exception is io.lettuce.core.RedisException: java.lang.IllegalStateException

2
0
希望看到您的想法,请您发表评论x