Redis分布式锁
Redis之分布式锁的实现方案 – 如何优雅地实现分布式锁(JAVA)
关键词
分布式锁: 是控制分布式系统之间同步访问共享资源的一种方式。spring-data-redis: Spring针对redis的封装, 配置简单, 提供了与Redis存储交互的抽象封装, 十分优雅, 也极具扩展性, 推荐读一读源码Lua: Lua 是一种轻量小巧的脚本语言, 可在redis执行.
前言
本文阐述了Redis分布式锁的一种简单JAVA实现及优化进阶, 实现了自动解锁、自定义异常、重试、注解锁等功能, 尝试用更优雅简洁的代码完成分布式锁.
需求
- 互斥性: 在分布式系统环境下, 一个锁只能被一个线程持有.
- 高可用: 不会发生死锁、即使客户端崩溃也可超时释放锁.
- 非阻塞: 获取锁失败即返回.
方案
Redis具有极高的性能, 且其命令对分布式锁支持友好, 借助SET命令即可实现加锁处理.
SET
EXseconds — Set the specified expire time, in seconds.PXmilliseconds — Set the specified expire time, in milliseconds.NX— Only set the key if it does not already exist.XX— Only set the key if it already exist.
实现
简单实现
做法为set if not exist(如果不存在则赋值), redis命令为原子操作, 所以单独使用set命令时不用担心并发导致异常.
具体代码实现如下: (spring-data-redis:2.1.6)
依赖引入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.1.4.RELEASE</version>
</dependency>
配置RedisTemplate
@Bean
@ConditionalOnMissingBean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {
StringRedisSerializer keySerializer = new StringRedisSerializer();
RedisSerializer<?> serializer = new StringRedisSerializer();
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(factory);
template.setKeySerializer(keySerializer);
template.setHashKeySerializer(keySerializer);
template.setValueSerializer(serializer);
template.setHashValueSerializer(serializer);
template.afterPropertiesSet();
return template;
}
简单的分布式锁实现
/**
* try lock
* @author piaoruiqing
*
* @param key lock key
* @param value value
* @param timeout timeout
* @param unit time unit
* @return
*/
public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) {
return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit);
}
以上代码即完成了一个简单的分布式锁功能:
其中redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit); 即为执行redis命令:
redis> set dlock:test-try-lock a EX 10 NX
OK
redis> set dlock:test-try-lock a EX 10 NX
null
早期版本spring-data-redis分布式锁实现及注意事项
方法
Boolean setIfAbsent(K key, V value, long timeout, TimeUnit unit);是在2.1版本中新增的, 早期版本中setIfAbsent无法同时指定过期时间, 若先使用setIfAbsent再设置key的过期时间, 会存在产生死锁的风险, 故旧版本中需要使用另外的写法进行实现. 以spring-data-redis:1.8.20为例
/**
* try lock
* @author piaoruiqing
*
* @param key lock key
* @param value value
* @param timeout timeout
* @param unit time unit
* @return
*/
public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) {
return redisTemplate.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
JedisCommands commands = (JedisCommands)connection.getNativeConnection();
String result = commands.set(key, value, "NX", "PX", unit.toMillis(timeout));
return "OK".equals(result);
}
});
}
spring-data-redis:1.8.20默认redis客户端为jedis, 可通过getNativeConnection直接调用jedis方法进行操作. 新旧版本实现方式最终效果相同.
优化进阶
基于AOP实现分布式锁注解工具 – 不仅能用, 而且好用
优化一 (自动解锁及重试)
自动解锁、重试: 上一节针对分布式锁的简单实现可满足基本需求, 但仍有较多可优化改进之处, 本小节将针对分布式锁自动解锁及重试进行优化
分布式锁抽象类
实现
AutoCloseable接口, 可使用try-with-resource方便地完成自动解锁.
/**
* distributed lock
* @author piaoruiqing
*
* @since JDK 1.8
*/
abstract public class DistributedLock implements AutoCloseable {
private final Logger LOGGER = LoggerFactory.getLogger(getClass());
/**
* release lock
* @author piaoruiqing
*/
abstract public void release();
/*
* (non-Javadoc)
* @see java.lang.AutoCloseable#close()
*/
@Override
public void close() throws Exception {
LOGGER.debug("distributed lock close , {}", this.toString());
this.unlock();
}
}
封装Redis分布式锁
RedisDistributedLock是Redis分布式锁的抽象, 继承了DistributedLock并实现了unlock接口.
/**
* redis distributed lock
*
* @author piaoruiqing
* @date: 2019/01/12 23:20
*
* @since JDK 1.8
*/
public class RedisDistributedLock extends DistributedLock {
private RedisOperations<String, String> operations;
private String key;
private String value;
private static final String COMPARE_AND_DELETE = // (一)
"if redis.call('get',KEYS[1]) == ARGV[1]\n" +
"then\n" +
" return redis.call('del',KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
/**
* @param operations
* @param key
* @param value
*/
public RedisDistributedLock(RedisOperations<String, String> operations, String key, String value) {
this.operations = operations;
this.key = key;
this.value = value;
}
/*
* (non-Javadoc)
* @see com.piaoruiqing.demo.distributed.lock.DistributedLock#release()
*/
@Override
public void release() { // (二)
List<String> keys = Collections.singletonList(key);
operations.execute(new DefaultRedisScript<String>(COMPARE_AND_DELETE), keys, value);
}
/*
* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return "RedisDistributedLock [key=" + key + ", value=" + value + "]";
}
}
- (一): 通过Lua脚本进行解锁, 使
对比锁的值+删除成为原子操作, 确保解锁操作的正确性. 简单来说就是防止删了别人的锁.
例如: 线程A方法未执行完毕时锁超时了, 随后B线程也获取到了该锁(key相同), 但此时如果A线程方法执行完毕尝试解锁, 如果不比对value, 那么A将删掉B的锁, 这时候C线程又能加锁, 业务将产生更严重的混乱.(不要过分依赖分布式锁, 在数据一致性要求较高的情况下, 数据库层面也要进行一定的处理, 例如唯一键约束、事务等来确保数据的正确) - (二): 使用
RedisOperations执行Lua脚本进行解锁操作. - 可参阅redis官方文档
加锁方法实现
/**
* @author piaoruiqing
* @param key lock key
* @param timeout timeout
* @param retries number of retries
* @param waitingTime retry interval
* @return
* @throws InterruptedException
*/
public DistributedLock acquire(String key, long timeout, int retries, long waitingTime) throws InterruptedException {
final String value
= RandomStringUtils.randomAlphanumeric(4) + System.currentTimeMillis(); // (一)
do {
Boolean result
= stringRedisTemplate.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.MILLISECONDS); // (二)
if (result) {
return new RedisDistributedLock(stringRedisTemplate, key, value);
}
if (retries > NumberUtils.INTEGER_ZERO) {
TimeUnit.MILLISECONDS.sleep(waitingTime);
}
if(Thread.currentThread().isInterrupted()){
break;
}
} while (retries-- > NumberUtils.INTEGER_ZERO);
return null;
}
- (一): 锁值要保证唯一, 使用4位随机字符串+时间戳基本可满足需求
注:UUID.randomUUID()在高并发情况下性能不佳. - (二): 尝试加锁, 代码中是2.1版本的做法, 早起版本参考上一节的实现.
此代码已经可以满足自动解锁和重试的需求了, 使用方法:
// 根据key加锁, 超时时间10000ms, 重试2次, 重试间隔500ms
try(DistributedLock lock = redisLockService.acquire(key, 10000, 2, 500);){
// do something
}
但还可以再优雅一点, 将模板代码封装起来, 可支持Lambda表达式:
/**
* lock handler
* @author piaoruiqing
*
* @since JDK 1.8
*/
@FunctionalInterface // (一)
public interface LockHandler<T> {
/**
* the logic you want to execute
*
* @author piaoruiqing
*
* @return
* @throws Throwable
*/
T handle() throws Throwable; // (二)
}
- (一): 定义函数式接口, 将业务逻辑放入Lambda表达式使代码更加简洁.
- (二): 业务中的异常不建议在分布式锁中处理, 直接抛出来更合理.
使用LockHandler完成加锁的实现:
public <T> T tryLock(String key, LockHandler<T> handler, long timeout, boolean autoUnlock, int retries, long waitingTime) throws Throwable {
try (DistributedLock lock = this.acquire(key, timeout, retries, waitingTime);) {
if (lock != null) {
LOGGER.debug("get lock success, key: {}", key);
return handler.handle();
}
LOGGER.debug("get lock fail, key: {}", key);
return null;
}
}
此时可以通过比较优雅的方式使用分布式锁来完成编码:
@Test
public void testTryLock() throws Throwable {
final String key = "dlock:test-try-lock";
AnyObject anyObject = redisLockService.tryLock(key, () -> {
// do something
return new AnyObject();
}, 10000, true, 0, 0);
}
本文发布于朴瑞卿的博客, 允许非商业用途转载, 但转载必须保留原作者朴瑞卿 及链接:https://blog.piaoruiqing.com.
如有授权方面的协商或合作, 请联系邮箱: piaoruiqing@gmail.com.
优化二 (自定义异常)
自定义异常: 前文中针对分布式锁的封装可满足多数业务场景, 但是考虑这样一种情况, 如果业务本身会返回
NULL当前的实现方式可能会存在错误的处理, 因为获取锁失败也会返回NULL. 避免返回NULL固然是一种解决方式, 但无法满足所有的场景, 此时支持自定义异常或许是个不错的选择.
实现起来很容易, 在原代码的基础之上增加onFailure参数, 如果锁为空直接抛出异常即可.
加锁方法实现
public <T> T tryLock(String key, LockHandler<T> handler, long timeout, boolean autoUnlock, int retries, long waitingTime, Class<? extends RuntimeException> onFailure) throws Throwable { // (一)
try (DistributedLock lock = this.getLock(key, timeout, retries, waitingTime);) {
if (lock != null) {
LOGGER.debug("get lock success, key: {}", key);
return handler.handle();
}
LOGGER.debug("get lock fail, key: {}", key);
if (null != onFailure) {
throw onFailure.newInstance(); // (二)
}
return null;
}
}
- (一):
Class<? extends RuntimeException>限定onFailure必须是RuntimeException或其子类. 笔者认为使用RuntimeException在语义上更容易理解. 如有需要使用其他异常也未尝不可(如获取锁失败需要统一处理等情况). - (二): 反射
优化三 (优雅地使用注解)
结合APO优雅地使用注解完成分布式锁:
定义注解
为了减小篇幅折叠部分注释
/**
* distributed lock
* @author piaoruiqing
* @date: 2019/01/12 23:15
*
* @since JDK 1.8
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface DistributedLockable {
/** timeout of the lock */
long timeout() default 5L;
/** time unit */
TimeUnit unit() default TimeUnit.MILLISECONDS;
/** number of retries */
int retries() default 0;
/** interval of each retry */
long waitingTime() default 0L;
/** key prefix */
String prefix() default "";
/** parameters that construct a key */
String[] argNames() default {};
/** construct a key with parameters */
boolean argsAssociated() default true;
/** whether unlock when completed */
boolean autoUnlock() default true;
/** throw an runtime exception while fail to get lock */
Class<? extends RuntimeException> onFailure() default NoException.class;
/** no exception */
public static final class NoException extends RuntimeException {
private static final long serialVersionUID = -7821936618527445658L;
}
}
timeout: 超时时间unit: 时间单位retries: 重试次数waitingTime: 重试间隔时间prefix: key前缀, 默认为包名+类名+方法名argNames: 组成key的参数
注解可使用在方法上, 需要注意的是, 本文注解通过spring AOP实现, 故对象内部方法间调用将无效.
切面实现
/**
* distributed lock aspect
* @author piaoruiqing
* @date: 2019/02/02 22:35
*
* @since JDK 1.8
*/
@Aspect
@Order(10) // (一)
public class DistributedLockableAspect implements KeyGenerator { // (二)
private final Logger LOGGER = LoggerFactory.getLogger(getClass());
@Resource
private RedisLockClient redisLockClient;
/**
* {@link DistributedLockable}
* @author piaoruiqing
*/
@Pointcut(value = "execution(* *(..)) && @annotation(com.github.piaoruiqing.dlock.annotation.DistributedLockable)")
public void distributedLockable() {}
/**
* @author piaoruiqing
*
* @param joinPoint
* @param lockable
* @return
* @throws Throwable
*/
@Around(value = "distributedLockable() && @annotation(lockable)")
public Object around(ProceedingJoinPoint joinPoint, DistributedLockable lockable) throws Throwable {
long start = System.nanoTime();
final String key = this.generate(joinPoint, lockable.prefix(), lockable.argNames(), lockable.argsAssociated()).toString();
Object result = redisLockClient.tryLock(
key, () -> {
return joinPoint.proceed();
},
lockable.unit().toMillis(lockable.timeout()), lockable.autoUnlock(),
lockable.retries(), lockable.unit().toMillis(lockable.waitingTime()),
lockable.onFailure()
);
long end = System.nanoTime();
LOGGER.debug("distributed lockable cost: {} ns", end - start);
return result;
}
}
- (一): 切面优先级
- (二):
KeyGenerator为自定义的key生成策略, 使用prefix+argName+arg作为key, 具体实现见源码.
此时可以通过注解的方式使用分布式锁, 这种方式对代码入侵较小, 且简洁.
@DistributedLockable(
argNames = {"anyObject.id", "anyObject.name", "param1"},
timeout = 20, unit = TimeUnit.SECONDS,
onFailure = RuntimeException.class
)
public Long distributedLockableOnFaiFailure(AnyObject anyObject, String param1, Object param2, Long timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
LOGGER.info("distributed-lockable: " + System.nanoTime());
} catch (InterruptedException e) {
}
return System.nanoTime();
}
扩展
分布式锁的实现有多种方式, 可根据实际场景和需求选择不同的介质进行实现:
- Redis: 性能高, 对分布式锁支持友好, 实现简单, 多数场景下表现较好.
- Zookeeper: 可靠性较高, 对分布式锁支持友好, 实现较复杂但有现成的实现可以使用.
- 数据库: 实现简单, 可使用
乐观锁/悲观锁实现, 性能一般, 高并发场景下不推荐
结语
本文阐述了Redis分布式锁的JAVA实现, 完成了自动解锁、自定义异常、重试、注解锁等功能, 源码见地址.
本实现还有诸多可以优化之处, 如:
- 重入锁的实现
- 优化重试策略为订阅Redis事件: 订阅Redis事件可以进一步优化锁的性能, 可通过wait+notifyAll来替代文中的sleep.
篇幅有限, 后续再行阐述.
参考文献
- https://redis.io/topics/distlock
- https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html
[…] 博客地址 https://blog.piaoruiqing.com/2019/05/19/redis分布式锁/ […]
拜读了您的代码,受益良多.
其中执行Lua脚本 应该修改为:
@Override
public void release() {
List keys = Collections.singletonList(key);
DefaultRedisScript redisScript = new DefaultRedisScript();
redisScript.setScriptText(COMPARE_AND_DELETE);
redisScript.setResultType(Boolean.class);
operations.execute(redisScript, keys,value);
}
否则会报
org.springframework.data.redis.RedisSystemException: Redis exception; nested exception is io.lettuce.core.RedisException: java.lang.IllegalStateException