锁在事务中的释放

锁在开发过程中属于是比较常见的一种保证资源互斥的手段,常用的手段一般就是加锁、设置超时时间、过期释放、手动释放,对应的伪代码:

1
2
3
4
5
6
7
8
9
function method:
if !lock.lock(key, timeout):
return
try:
doBusiness()
exception:
doException()
finally:
lock.release(key)

伪代码应该没有问题,有同学说lock.lock(key, timeout)应该放到try里面,其实这里呢觉得不太适合,业务如果加锁出现了异常,方法就直接结束了,放到try中的话,出现了异常依然会走到finally中去释放锁,如果在释放锁之前有别的业务抢到了锁,那么这时候就被提前释放掉了,大致图解如下:

image-20210429093500721

从图示可以看到线程B的锁被线程A给释放掉了,此时线程C抢占锁时能够顺利的加锁成功,结果就是造成线程B和线程C的数据出现了异常,所以lock.lock(key, timeout)申请锁的语句不应和释放锁在同一个try...catch...finally中的原因就不明而厉了。

既然说到了加锁,现在我们再来说说释放锁,释放锁分为自动释放和主动释放,自动释放就是非人为的释放,作为一名合格的开发人员,大家都知道在加锁时会给锁设置一个过期时间,超过这个时间之后锁自动释放,目的也很明确:

  1. 为了防止手动释放锁时因系统crash而造成的锁释放失败导致锁长期持有的情况;
  2. 一旦锁无法及时释放,那么后续的请求将全部无法成功加锁,业务将无法正常进行
  3. 图解如下:
image-20210429094814276

图示线程A加锁成功但锁未释放,导致后续的线程B和线程C,乃至线程n都抢占不到锁,这将是一个非常严重的产线事故,不得了不得了。

很多文章都说应该在finally代码块中释放锁,因为不论try中代码执行是否出现异常,finally代码块都必定会被执行,通过上面加锁代码分析知道只有抢到锁的线程才能执行到try中,所以在finally中处理锁释放逻辑无可厚非,但是现在有一个新的问题:如果该方法是有事务的方法,在锁释放的时候,事务还没有提交,我们再来看张图(图有点多,但是清晰):

image-20210429105852689

两个线程执行之后,age的值是2,并不是预期的3,是不是就出现了脏数据,并且脏的还不一般。所以正常的处理流程应该是先提交事务再释放锁,那么我们来想一想应该怎么做才能让事务先提交,锁后释放。

  1. 使用scheduler线程池延迟释放锁
  2. 使用AOP织入释放
  3. 使用TransactionSynchronizationAdapter监听释放

前两种大家应该很容易就想到如何实现,那就不做太多的解释了,不说又有点不负责任的样子,还是唠唠吧。。

有一个LockProcessor类,在文章最后

1. 锁延迟释放

锁延迟释放,是一个不错的方法,但还是没那么的完美,关键点在延迟多久释放锁,如果延迟3秒释放,那么其他线程又白白多等了3秒,但若是延迟100ms,仍然会发生锁在事务提交之前释放的情况。

  • 首先我们要定义一个ScheduledThreadPoolExecutor线程池来执行所释放的任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    import java.util.concurrent.ScheduledThreadPoolExecutor;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import org.springframework.stereotype.Component;
    import lombok.extern.slf4j.Slf4j;

    @Slf4j
    @Component
    public class SnowmanScheduledExecutor {

    private static final ScheduledThreadPoolExecutor EXECUTOR = new ScheduledThreadPoolExecutor(10, r -> {
    Thread thread = new Thread(r);
    thread.setName("lock release executor threadId - " + thread.getId());
    return thread;
    }, new ThreadPoolExecutor.DiscardPolicy());

    /**
    * accept runnable job
    */
    public void execute(Runnable runnable) {
    this.execute(runnable, 100);
    }

    public void execute(Runnable runnable, long delayTime) {
    this.execute(runnable, delayTime, TimeUnit.MILLISECONDS);
    }

    public void execute(Runnable runnable, long delayTime, TimeUnit unit) {
    EXECUTOR.schedule(runnable, delayTime, unit);
    }

    }
  • finally中将释放锁放入到线程池中静等释放

    在业务代码中,直接使用该类即可。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    @Autowired
    private SnowmanScheduledExecutor snowmanScheduledExecutor;

    @Transactional(rollbackFor = Exception.class)
    public void generate(String lock_key) {

    // acquire lock
    int lockTimes = 0;
    while (!lock.tryLock(groupCode, Constants.DEFAULT_TIMEOUT)) {
    try {
    Thread.sleep(100L);
    } catch (InterruptedException e) {
    }
    if (lockTimes++ > 3) {
    throw new CannotAcquireLockException("acquire lock timeout!");
    }
    }
    try {
    doBusiness();
    } catch (Exception e) {
    log.error("generate error!", e);
    } finally {
    snowmanScheduledExecutor.execute(() -> {
    lock.releaseLock(groupCode);
    });
    }
    }
  • 并发测试

    我们使用jmeter进行测试,开启100个线程循环10次。这里不贴图了,采用了默认的100ms作为延迟释放时间,除了吞吐量低了点,但未产生脏数据。不过虽然没有产生脏数据,但吞吐量是在高并发环境下很重要的一个指标,以降低吞吐量来达到数据一致性的方法好像也不是最优的。

####2. AOP织入释放

既然通过延迟释放不能近乎完美的解决锁释放的问题,那么通过AOP是否可以呢?我们知道AOP实际上就是通过代理的方式给类和方法添加一些额外的操作,如果我们在进入业务方法之前申请锁,在业务方法返回之后提交事务,然后在AOP中释放锁,是不是就能完美的解决了?试一下看看。

  • 编写AOP代理类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    import java.util.Map;

    import javax.servlet.http.HttpServletRequest;

    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.annotation.Pointcut;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.dao.CannotAcquireLockException;
    import org.springframework.transaction.TransactionDefinition;
    import org.springframework.transaction.TransactionStatus;
    import org.springframework.transaction.support.DefaultTransactionDefinition;
    import org.springframework.web.context.request.RequestContextHolder;
    import org.springframework.web.context.request.ServletRequestAttributes;
    import org.springframework.web.servlet.HandlerMapping;

    import cc.bert.lt.config.Constants;
    import cc.bert.lt.utils.LockProcessor;
    import cc.bert.lt.utils.TransactionUtils;
    import lombok.extern.slf4j.Slf4j;

    @Slf4j
    @Aspect
    @Configuration
    public class LockAdvise {

    @Autowired
    private LockProcessor lockProcessor;

    @Autowired
    private TransactionUtils transactionUtils;

    private DefaultTransactionDefinition definition;

    @Pointcut("execution(public * cc.bert.lt.service.*.*(..))")
    public void LockAdvise() {
    definition = new DefaultTransactionDefinition();
    definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
    }

    @Around("LockAdvise()")
    public Object lock(ProceedingJoinPoint point) {
    TransactionStatus status = transactionUtils.begin(definition);

    HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes())
    .getRequest();
    Map pathVariables = (Map) request.getAttribute(HandlerMapping.URI_TEMPLATE_VARIABLES_ATTRIBUTE);

    String groupCode = String.valueOf(pathVariables.get("code"));

    // acquire lock
    Object obj = null;
    try {
    int lockTimes = 0;
    while (!lockProcessor.tryLock(groupCode, Constants.DEFAULT_TIMEOUT)) {
    try {
    Thread.sleep(100L);
    } catch (InterruptedException e) {
    }
    if (lockTimes++ > 3) {
    throw new CannotAcquireLockException("acquire lock timeout!");
    }
    }
    obj = point.proceed();
    transactionUtils.commit(status);
    } catch (Throwable throwable) {
    log.error("occur ex: ", throwable);
    transactionUtils.rollback(status);
    } finally {
    lockProcessor.releaseLock(groupCode);
    }
    return obj;
    }
    }
  • TransactionUtils类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    package cc.bert.lt.utils;

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.stereotype.Component;
    import org.springframework.transaction.PlatformTransactionManager;
    import org.springframework.transaction.TransactionStatus;
    import org.springframework.transaction.support.DefaultTransactionDefinition;

    import lombok.extern.slf4j.Slf4j;

    /**
    * @author chuan
    */
    @Slf4j
    @Component
    public class TransactionUtils {

    @Autowired
    @Qualifier("code_transaction")
    private PlatformTransactionManager platformTransactionManager;

    public TransactionStatus begin(DefaultTransactionDefinition definition) {
    log.debug("start transaction!");
    return platformTransactionManager.getTransaction(definition);
    }

    public void commit(TransactionStatus status) {
    platformTransactionManager.commit(status);
    }

    public void rollback(TransactionStatus status) {
    platformTransactionManager.rollback(status);
    }

    }

上面的代码都不需要做过多的解释,很简单,但是有一点需要注意的就是被代理方法上不能出现@Transactional注解

3. TransactionSynchronizationAdapter释放

这是一个比较推荐的方式,使用起来也比较简单,只需要在业务方法的finally中加入一句代码即可:

1
2
3
4
5
6
7
// 监听器释放
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
lockProcessor.releaseLock(groupCode);
}
});

也可以实现一个TransactionSynchronizationAdapter子类来做,条条大路通罗马。


LockProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package cc.bert.lt.utils;

import java.util.Set;

import org.springframework.stereotype.Component;

import cc.bert.lt.config.Constants;
import lombok.extern.slf4j.Slf4j;

/**
* @author chuan
*/
@Slf4j
@Component
public class LockProcessor {

/**
* groupCode cache set
*/
private volatile Set<String> groupCodeSet = new ConcurrentHashSet<>();

/**
* release lock
*
* @param groupCode
*/
public boolean releaseLock(String groupCode) {
return groupCodeSet.remove(groupCode);
}

/**
* try to lock the group
*
* @param groupCode
* @param timeout unit: ms
* @return
*/
public boolean tryLock(String groupCode, long timeout) {
if (timeout <= 0L) {
timeout = Constants.DEFAULT_TIMEOUT;
}
long startTime = System.currentTimeMillis();
while ((System.currentTimeMillis() - startTime) < timeout) {
if (groupCodeSet.add(groupCode)) {
return true;
}
}
return false;
}

}