+-
Reactive分布式锁-Redis实现

背景原因

1 . 原有授权项目集成了Spring中的 RedisLockRegistry以实现分布式锁,在迁移授权服务为Reactive编程的时候,需要实现Reactive方式的分布式锁实现( Reference[1])。 2 . 原有 RedisLockRegistry是基于 Lua-ScriptThreadId来进行处理的。 3 . 主要目的是保持迁移后的项目中原有业务逻辑不变,并可保证并发问题。

技术方案及难点

1 . 由于Reactive的编程模式相对于传统编程模式的变化,在Reactor-Netty的Event-Loop环境下,无法再使用线程ID进行逻辑区分.但仍然可以使用Redis的Lua-Script来实现并发控制 2 . 在并发时,无法再使用传统的 while(true) {... break}Thread.sleep的方式,来等待获取锁和检查锁状态。需要转换思路,使用Reactive的方式进行处理。 3 . 最终实现和 RedisLockRegistry基本保持一致的锁处理方案,并适配Reactive环境

核心依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>io.projectreactor.addons</groupId>
    <artifactId>reactor-extra</artifactId>
</dependency>

实现逻辑

1 . 锁处理 Lua-Script
private static final String OBTAIN_LOCK_SCRIPT = "local lockSet = redis.call('SETNX', KEYS[1], ARGV[1])\n" +
            "if lockSet == 1 then\n" +
            "  redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
            "  return true\n" +
            "else\n" +
            "  return false\n" +
            "end";
2 . 核心获取锁代码片段
/**
 * execute redis-script to obtain lock
 * @return if obtain success then return true otherwise return false
 */
private Mono<Boolean> obtainLock() {
    return Mono.from(ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate
                    .execute(ReactiveRedisDistributedLockRegistry.this.obtainLockScript,
                            Collections.singletonList(this.lockKey),
                            List.of(this.lockId,String.valueOf(ReactiveRedisDistributedLockRegistry.this.expireAfter)))
            )
            .map(success -> {
                boolean result = Boolean.TRUE.equals(success);
                if (result) {
                    this.lockedAt = System.currentTimeMillis();
                }
                return result;
            });
}
3 . 核心释放锁代码片段
/**
 * remove redis lock key
 * @return
 */
private Mono<Boolean> removeLockKey() {
    return Mono.just(ReactiveRedisDistributedLockRegistry.this.unlinkAvailable)
            .filter(unlink -> unlink)
            .flatMap(unlink -> ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate
                    .unlink(this.lockKey)
                    .doOnError(throwable -> {
                        ReactiveRedisDistributedLockRegistry.this.unlinkAvailable = false;
                        if (log.isDebugEnabled()) {
                            log.debug("The UNLINK command has failed (not supported on the Redis server?); " +
                                    "falling back to the regular DELETE command", throwable);
                        } else {
                            log.warn("The UNLINK command has failed (not supported on the Redis server?); " +
                                    "falling back to the regular DELETE command: " + throwable.getMessage());
                        }
                    })
                    .onErrorResume(throwable -> ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate.delete(this.lockKey))
            )
            .switchIfEmpty(ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate.delete(this.lockKey))
            .then(Mono.just(true));
}
5 . 检查锁是否被占用代码片段
/**
 * check is the acquired is in this process
 * @return
 */
Mono<Boolean> isAcquiredInThisProcess() {
    return ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate.opsForValue()
            .get(this.lockKey)
            .map(this.lockId::equals);
}
4 . 基础锁接口定义
public interface ReactiveDistributedLock {

    /**
     * get lock Key
     * @return
     */
    String getLockKey();

    /**
     * Try to acquire the lock once. Lock is acquired for a pre configured duration.
     * @return if lock succeeded then return true otherwise return false
     * <strong>if flow is empty default return false</strong>
     */
    Mono<Boolean> acquireOnce();

    /**
     * Try to acquire the lock. Lock is acquired for a pre configured duration.
     * @return
     * <strong>if flow is empty then throw an excpetion {@link CannotAcquireLockException}</strong>
     */
    Mono<Boolean> acquire();

    /**
     * Try to acquire the lock for a given duration.
     * @param duration duration in used
     * @return
     * <strong>the given duration must less than the default duration.Otherwise the lockKey well be expire by redis with default expire duration</strong>
     * <strong>if flow is empty then throw an excpetion {@link CannotAcquireLockException}</strong>
     */
    Mono<Boolean> acquire(Duration duration);

    /**
     * Release the lock.
     * @return
     * <strong>if lock key doesn't exist in the redis,then throw an exception {@link IllegalStateException}</strong>
     */
    Mono<Boolean> release();
    
}    
5 . 基础锁接口实现
private final class ReactiveRedisDistributedLock implements ReactiveDistributedLock {

        @Override
        public String getLockKey() {
            return this.lockKey;
        }

        @Override
        public Mono<Boolean> acquireOnce() {
            log.debug("Acquire Lock Once,LockKey:{}",this.lockKey);
            return this.obtainLock()
                    .doOnNext(lockResult -> log.info("Obtain Lock Once,LockKey:{},Result:{}",this.lockKey,lockResult))
                    .doOnError(this::rethrowAsLockException);
        }

        @Override
        public Mono<Boolean> acquire() {
            log.debug("Acquire Lock By Default Duration :{}" ,expireDuration);
            // 这里使用默认配置的最大等待时间获取锁
            return this.acquire(ReactiveRedisDistributedLockRegistry.this.expireDuration);
        }

        @Override
        public Mono<Boolean> acquire(Duration duration) {
          //尝试获取锁
            return this.obtainLock()
                 //过滤获取锁成功
                    .filter(result -> result)
                    //如果是Empty,则重试
                    .repeatWhenEmpty(Repeat.onlyIf(repeatContext -> true)
                           //重试超时时间
                            .timeout(duration)
                            //重试间隔
                            .fixedBackoff(Duration.ofMillis(100))
                            .//重试日志记录
                            .doOnRepeat(objectRepeatContext -> {
                                if (log.isTraceEnabled()) {
                                    log.trace("Repeat Acquire Lock Repeat Content:{}",objectRepeatContext);
                                }
                            })
                    )
                    //这里必须使用 `defaultIfEmpty`,在repeat超时后,整个流的信号会变为empty,如果不处理empty则整个留就中断了或者由最外层的empty处理方法处理
                    .defaultIfEmpty(false)
                    //记录上锁结果日志
                    .doOnNext(lockResult -> log.info("Obtain Lock,Lock Result :{},Lock Info:{}",lockResult,this))
                    //如果出错,则抛出异常信息
                    .doOnError(this::rethrowAsLockException);
        }

        @Override
        public Mono<Boolean> release() {
          //检查当前锁是否是自己占用
            return this.isAcquiredInThisProcess()
                 //占用的锁
                    .filter(isThisProcess -> isThisProcess)
                    //释放锁
                    .flatMap(isThisProcess -> this.removeLockKey()
                           //记录日志
                            .doOnNext(releaseResult -> log.info("Released Lock:{},Lock Info:{}",releaseResult,this))
                            //出现未知异常,则重新抛出
                            .onErrorResume(throwable -> Mono.fromRunnable(() -> ReflectionUtils.rethrowRuntimeException(throwable)))
                            //如果流是empty,则表示,锁已经不存在了,被Redis配置的最大过期时间释放
                            .switchIfEmpty(Mono.error(new IllegalStateException("Lock was released in the store due to expiration. " + "The integrity of data protected by this lock may have been compromised.")))
                    );
        }
}
6 . 内置定时任务,用于检测过期没在使用的RedisLock,并释放内存缓存。定时任务挂载SpringBean的声明周期,已完成定时任务启动和关闭。( InitializingBean, DisposableBean)
private Scheduler scheduler = Schedulers.newSingle("redis-lock-evict",true);

//挂载Spring 声明周期
@Override
public void afterPropertiesSet() {
    log.debug("Initialize Auto Remove Unused Lock Execution");
    //使用Flux的特性来实现定时任务
    Flux.interval(expireEvictIdle, scheduler)
            .flatMap(value -> {
                long now = System.currentTimeMillis();
                log.trace("Auto Remove Unused Lock ,Evict Triggered");
                return Flux.fromIterable(this.locks.entrySet())
                        //过滤已经过期的锁对象
                        .filter(entry -> now - entry.getValue().getLockedAt() > expireAfter)
                        //将没有被占用的锁删除
                        .flatMap(entry -> entry.getValue()
                                .isAcquiredInThisProcess()
                                .filter(inProcess -> !inProcess)
                                .doOnNext(inProcess -> {
                                    this.locks.remove(entry.getKey());
                                    log.debug("Auto Remove Unused Lock,Lock Info:{}", entry);
                                })
                                //错误记录日志
                                .onErrorResume(throwable -> {
                                    log.error("Auto Remove Unused Locks Occur Exception,Lock Info: " + entry, throwable);
                                    return Mono.empty();
                                })
                        );
            })
            //Scheduler 需要订阅才能执行
            .subscribe();
}

@Override
public void destroy() {
    log.debug("Shutdown Auto Remove Unused Lock Execution");
    //挂载SpringBean声明周期,销毁Scheduler
    this.scheduler.dispose();
}
7 .优化锁接口处理逻辑,增加接口默认方法,便于锁控制和处理。将锁下游执行逻辑包装成 Supplier便于调用和处理
/**
 * Acquire a lock and release it after action is executed or fails.
 *
 * @param <T>  type od value emitted by the action
 * @param executionSupplier to be executed subscribed to when lock is acquired
 * @return true if lock is acquired.
 * @see ReactiveDistributedLock#acquire()
 */
default <T> Mono<T> acquireAndExecute(Supplier<Mono<T>> executionSupplier) {
    return acquire()
            .flatMap(acquireResult -> Mono.just(acquireResult)
                        .filter(result -> result)
                        //这里配合上锁逻辑,如果是空,则表示无法获取锁
                        .switchIfEmpty(Mono.error(new CannotAcquireLockException("Failed to Obtain Lock ,LockKey: " + getLockKey())))
                        .flatMap(lockResult -> executionSupplier
                                .get()
                                .flatMap(result -> this.release()
                                        .flatMap(releaseResult -> Mono.just(result))
                                )
                                .switchIfEmpty(this.release().then(Mono.empty()))
                                .onErrorResume(throwable -> this.release().flatMap(r -> Mono.error(throwable)))
                        )
            );
}

/**
 * Acquire a lock for a given duration and release it after action is executed.
 *
 * @param <T>      type od value emitted by the action
 * @param duration how much time must pass for the acquired lock to expire
 * @param executionSupplier     to be executed subscribed to when lock is acquired
 * @return true, if lock is acquired
 * @see ReactiveDistributedLock#acquire(Duration)
 */
default <T> Mono<T> acquireAndExecute(Duration duration, Supplier<Mono<T>> executionSupplier) {
    return acquire(duration)
            .flatMap(acquireResult -> Mono.just(acquireResult)
                    .filter(result -> result)
                    .switchIfEmpty(Mono.error(new CannotAcquireLockException("Failed to Obtain Lock ,LockKey: " + getLockKey())))
                    .flatMap(lockResult -> executionSupplier
                            .get()
                            .flatMap(result -> this.release()
                                    .flatMap(releaseResult -> Mono.just(result))
                            )
                            .switchIfEmpty(this.release().then(Mono.empty()))
                            .onErrorResume(throwable -> this.release().flatMap(r -> Mono.error(throwable)))
                    )
            );
}

/**
 * Acquire a lock and release it after action is executed or fails.
 *
 * @param <T>  type od value emitted by the action
 * @param executionSupplier     to be executed subscribed to when lock is acquired
 * @return true if lock is acquired.
 * @see ReactiveDistributedLock#acquire()
 */
default <T> Flux<T> acquireAndExecuteMany(Supplier<Flux<T>> executionSupplier) {
    return acquire()
            .flatMapMany(acquireResult -> Mono.just(acquireResult)
                    .filter(result -> result)
                    .switchIfEmpty(Mono.error(new CannotAcquireLockException("Failed to Obtain Lock ,LockKey: " + getLockKey())))
                    .flatMapMany(lockResult -> executionSupplier
                            .get()
                            .flatMap(result -> this.release()
                                    .flatMap(releaseResult -> Mono.just(result))
                            )
                            .switchIfEmpty(this.release().thenMany(Flux.empty()))
                            .onErrorResume(throwable -> this.release().flatMap(r -> Mono.error(throwable)))
                    )
            );
}

/**
 * Acquire a lock for a given duration and release it after action is executed.
 *
 * @param <T>      type od value emitted by the action
 * @param duration how much time must pass for the acquired lock to expire
 * @param executionSupplier     to be executed subscribed to when lock is acquired
 * @return true, if lock is acquired
 * @see ReactiveDistributedLock#acquire(Duration)
 */
default <T> Flux<T> acquireAndExecuteMany(Duration duration, Supplier<Flux<T>> executionSupplier) {
    return acquire(duration)
            .flatMapMany(acquireResult -> Mono.just(acquireResult)
                    .filter(result -> result)
                    .switchIfEmpty(Mono.error(new CannotAcquireLockException("Failed to Obtain Lock ,LockKey: " + getLockKey())))
                    .flatMapMany(lockResult -> executionSupplier
                            .get()
                            .flatMap(result -> this.release()
                                    .flatMap(releaseResult -> Mono.just(result))
                            )
                            .switchIfEmpty(this.release().thenMany(Flux.empty()))
                            .onErrorResume(throwable -> this.release().flatMap(r -> Mono.error(throwable)))
                    )
            );
}

使用方法

application.yml中进行参数配置
lock:
  redis:
    reactive:
      expire-after: 10s
      expire-evict-idle: 1s
注入Bean
@Autowired
private ReactiveRedisDistributedLockRegistry reactiveRedisDistributedLockRegistry;
1 . 上锁一次,快速失败
@Test
public void testAcquireOnce() throws Exception {
    ProcessFunctions processFunctions = new ProcessFunctions();
    String key = "LOCK_ONCE";
    Flux<String> flux = Flux.range(0, 5)
            .flatMap(value -> this.reactiveRedisDistributedLockRegistry.obtain(key)
                    .acquireOnce()
                    .filter(acquireResult -> acquireResult)
                    .flatMap(acquireResult -> processFunctions.processFunction())
                    .switchIfEmpty(Mono.just(FAILED))
            )
            .doOnNext(System.out::println);
    StepVerifier.create(flux)
            .expectNext(OK)
            .expectNext(FAILED)
            .expectNext(FAILED)
            .expectNext(FAILED)
            .expectNext(FAILED)
            .verifyComplete();

}
2 . 上锁等待默认超时时间
@Test
public void testAcquireDefaultDurationAndProcessDuringTheExpireDuration() throws Exception {
    //default lock expire is 10S
    ProcessFunctions processFunctions = new ProcessFunctions();
    String key = "LOCK_DEFAULT";
    Flux<String> flux = Flux.range(0, 3)
            .flatMap(value -> this.reactiveRedisDistributedLockRegistry.obtain(key)
                    .acquireAndExecute(() ->
                            processFunctions.processDelayFunction(Duration.ofSeconds(2))
                    )
                    .doOnNext(System.out::println)
                    .onErrorResume(throwable -> CannotAcquireLockException.class.isAssignableFrom(throwable.getClass()),throwable -> {
                        System.out.println("Lock Error");
                        return Mono.just(FAILED);
                    })
            );
    StepVerifier.create(flux)
            .expectNext(OK)
            .expectNext(OK)
            .expectNext(OK)
            .verifyComplete();

}
3 . 上锁指定时间
@Test
public void testAcquireDuration() throws Exception {
    ProcessFunctions processFunctions = new ProcessFunctions();
    String key = "LOCK_GIVEN_DURATION";
    Flux<String> flux = Flux.range(0, 3)
            .subscribeOn(Schedulers.parallel())
            .flatMap(value -> this.reactiveRedisDistributedLockRegistry.obtain(key)
                    .acquireAndExecute(Duration.ofSeconds(3), () ->
                            processFunctions.processDelayFunction(Duration.ofSeconds(2))
                    )
                    .doOnNext(System.out::println)
                    .onErrorResume(throwable -> CannotAcquireLockException.class.isAssignableFrom(throwable.getClass()), throwable -> {
                        System.out.println("Lock Error");
                        return Mono.just(FAILED);
                    })
            );
    StepVerifier.create(flux)
            .expectNext(OK)
            .expectNext(FAILED)
            .expectNext(OK)
            .verifyComplete();

}

参考文档

1 . RedisLockRegistry: https://docs.spring.io/spring-integration/docs/5.3.6.RELEASE/reference/html/redis.html#redis-lock-registry 2 . Trigger Mono Execution After Another Mono Terminates: https://stackoverflow.com/questions/50686524/how-to-trigger-mono-execution-after-another-mono-terminates

源码相关

维护在GitHub,欢迎Issue和Star reactive-redis-distributed-lock 目前以SpringBoot脚手架的形式编写,并没有发布到Maven中央仓库,若有需要可以自行打包。