锁在开发过程中属于是比较常见的一种保证资源互斥的手段,常用的手段一般就是加锁、设置超时时间、过期释放、手动释放,对应的伪代码:
1 | function method: |
伪代码应该没有问题,有同学说lock.lock(key, timeout)
应该放到try
里面,其实这里呢觉得不太适合,业务如果加锁出现了异常,方法就直接结束了,放到try
中的话,出现了异常依然会走到finally
中去释放锁,如果在释放锁之前有别的业务抢到了锁,那么这时候就被提前释放掉了,大致图解如下:
从图示可以看到线程B的锁被线程A给释放掉了,此时线程C抢占锁时能够顺利的加锁成功,结果就是造成线程B和线程C的数据出现了异常,所以lock.lock(key, timeout)
申请锁的语句不应和释放锁在同一个try...catch...finally
中的原因就不明而厉了。
既然说到了加锁,现在我们再来说说释放锁,释放锁分为自动释放和主动释放,自动释放就是非人为的释放,作为一名合格的开发人员,大家都知道在加锁时会给锁设置一个过期时间,超过这个时间之后锁自动释放,目的也很明确:
- 为了防止手动释放锁时因系统crash而造成的锁释放失败导致锁长期持有的情况;
- 一旦锁无法及时释放,那么后续的请求将全部无法成功加锁,业务将无法正常进行
- 图解如下:
图示线程A加锁成功但锁未释放,导致后续的线程B和线程C,乃至线程n都抢占不到锁,这将是一个非常严重的产线事故,不得了不得了。
很多文章都说应该在finally
代码块中释放锁,因为不论try
中代码执行是否出现异常,finally
代码块都必定会被执行,通过上面加锁代码分析知道只有抢到锁的线程才能执行到try
中,所以在finally
中处理锁释放逻辑无可厚非,但是现在有一个新的问题:如果该方法是有事务
的方法,在锁释放的时候,事务还没有提交,我们再来看张图(图有点多,但是清晰):
两个线程执行之后,age的值是2,并不是预期的3,是不是就出现了脏数据,并且脏的还不一般。所以正常的处理流程应该是先提交事务再释放锁,那么我们来想一想应该怎么做才能让事务先提交,锁后释放。
- 使用
scheduler
线程池延迟释放锁 - 使用
AOP
织入释放 - 使用
TransactionSynchronizationAdapter
监听释放
前两种大家应该很容易就想到如何实现,那就不做太多的解释了,不说又有点不负责任的样子,还是唠唠吧。。
有一个
LockProcessor
类,在文章最后
1. 锁延迟释放
锁延迟释放,是一个不错的方法,但还是没那么的完美,关键点在延迟多久释放锁,如果延迟3秒释放,那么其他线程又白白多等了3秒,但若是延迟100ms,仍然会发生锁在事务提交之前释放的情况。
-
首先我们要定义一个
ScheduledThreadPoolExecutor
线程池来执行所释放的任务1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
4j
public class SnowmanScheduledExecutor {
private static final ScheduledThreadPoolExecutor EXECUTOR = new ScheduledThreadPoolExecutor(10, r -> {
Thread thread = new Thread(r);
thread.setName("lock release executor threadId - " + thread.getId());
return thread;
}, new ThreadPoolExecutor.DiscardPolicy());
/**
* accept runnable job
*/
public void execute(Runnable runnable) {
this.execute(runnable, 100);
}
public void execute(Runnable runnable, long delayTime) {
this.execute(runnable, delayTime, TimeUnit.MILLISECONDS);
}
public void execute(Runnable runnable, long delayTime, TimeUnit unit) {
EXECUTOR.schedule(runnable, delayTime, unit);
}
} -
在
finally
中将释放锁放入到线程池中静等释放在业务代码中,直接使用该类即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private SnowmanScheduledExecutor snowmanScheduledExecutor;
(rollbackFor = Exception.class)
public void generate(String lock_key) {
// acquire lock
int lockTimes = 0;
while (!lock.tryLock(groupCode, Constants.DEFAULT_TIMEOUT)) {
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
}
if (lockTimes++ > 3) {
throw new CannotAcquireLockException("acquire lock timeout!");
}
}
try {
doBusiness();
} catch (Exception e) {
log.error("generate error!", e);
} finally {
snowmanScheduledExecutor.execute(() -> {
lock.releaseLock(groupCode);
});
}
} -
并发测试
我们使用jmeter进行测试,开启100个线程循环10次。这里不贴图了,采用了默认的100ms作为延迟释放时间,除了吞吐量低了点,但未产生脏数据。不过虽然没有产生脏数据,但吞吐量是在高并发环境下很重要的一个指标,以降低吞吐量来达到数据一致性的方法好像也不是最优的。
####2. AOP织入释放
既然通过延迟释放不能近乎完美的解决锁释放的问题,那么通过AOP是否可以呢?我们知道AOP实际上就是通过代理的方式给类和方法添加一些额外的操作,如果我们在进入业务方法之前申请锁,在业务方法返回之后提交事务,然后在AOP中释放锁,是不是就能完美的解决了?试一下看看。
-
编写AOP代理类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.dao.CannotAcquireLockException;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import org.springframework.web.servlet.HandlerMapping;
import cc.bert.lt.config.Constants;
import cc.bert.lt.utils.LockProcessor;
import cc.bert.lt.utils.TransactionUtils;
import lombok.extern.slf4j.Slf4j;
4j
public class LockAdvise {
private LockProcessor lockProcessor;
private TransactionUtils transactionUtils;
private DefaultTransactionDefinition definition;
"execution(public * cc.bert.lt.service.*.*(..))") (
public void LockAdvise() {
definition = new DefaultTransactionDefinition();
definition.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
}
"LockAdvise()") (
public Object lock(ProceedingJoinPoint point) {
TransactionStatus status = transactionUtils.begin(definition);
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes())
.getRequest();
Map pathVariables = (Map) request.getAttribute(HandlerMapping.URI_TEMPLATE_VARIABLES_ATTRIBUTE);
String groupCode = String.valueOf(pathVariables.get("code"));
// acquire lock
Object obj = null;
try {
int lockTimes = 0;
while (!lockProcessor.tryLock(groupCode, Constants.DEFAULT_TIMEOUT)) {
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
}
if (lockTimes++ > 3) {
throw new CannotAcquireLockException("acquire lock timeout!");
}
}
obj = point.proceed();
transactionUtils.commit(status);
} catch (Throwable throwable) {
log.error("occur ex: ", throwable);
transactionUtils.rollback(status);
} finally {
lockProcessor.releaseLock(groupCode);
}
return obj;
}
} -
TransactionUtils类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36package cc.bert.lt.utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import lombok.extern.slf4j.Slf4j;
/**
* @author chuan
*/
4j
public class TransactionUtils {
"code_transaction") (
private PlatformTransactionManager platformTransactionManager;
public TransactionStatus begin(DefaultTransactionDefinition definition) {
log.debug("start transaction!");
return platformTransactionManager.getTransaction(definition);
}
public void commit(TransactionStatus status) {
platformTransactionManager.commit(status);
}
public void rollback(TransactionStatus status) {
platformTransactionManager.rollback(status);
}
}
上面的代码都不需要做过多的解释,很简单,但是有一点需要注意的就是被代理方法上不能出现@Transactional
注解
3. TransactionSynchronizationAdapter
释放
这是一个比较推荐的方式,使用起来也比较简单,只需要在业务方法的finally中加入一句代码即可:
1 | // 监听器释放 |
也可以实现一个TransactionSynchronizationAdapter
子类来做,条条大路通罗马。
LockProcessor
1 | package cc.bert.lt.utils; |