Thread 模式类比的是工厂里车间工人的工作模式。但其实在现实世界,工厂里还有一种流水线的工作模式,类比到编程领域,就是生产者 - 消费者模式。
当然,除了在线程池中的应用,为了提升性能,并发编程领域很多地方也都用到了生产者 - 消费者模式,例如 Log4j2 中异步 Appender 内部也用到了生产者 - 消费者模式。所以今天我们就来深入地聊聊生产者 - 消费者模式,看看它具体有哪些优点,以及如何提升系统的性能。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
// 任务队列
BlockingQueue<Task> bq=new
LinkedBlockingQueue<>(2000);
// 启动 5 个消费者线程
// 执行批量任务
void start() {
ExecutorService es=xecutors
.newFixedThreadPool(5);
for (int i=0; i<5; i++) {
es.execute(()->{
try {
while (true) {
// 获取批量任务
List<Task> ts=pollTasks();
// 执行批量任务
execTasks(ts);
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
// 从任务队列中获取批量任务
List<Task> pollTasks()
throws InterruptedException{
List<Task> ts=new LinkedList<>();
// 阻塞式获取一条任务
Task t = bq.take();
while (t != null) {
ts.add(t);
// 非阻塞式获取一条任务
t = bq.poll();
}
return ts;
}
// 批量执行任务
execTasks(List<Task> ts) {
// 省略具体代码无数
}
|
支持分阶段提交以提升性能
利用生产者 - 消费者模式还可以轻松地支持一种分阶段提交的应用场景。我们知道写文件如果同步刷盘性能会很慢,所以对于不是很重要的数据,我们往往采用异步刷盘的方式。我曾经参与过一个项目,其中的日志组件是自己实现的,采用的就是异步刷盘方式,刷盘的时机是:
- ERROR 级别的日志需要立即刷盘;
- 数据积累到 500 条需要立即刷盘;
- 存在未刷盘数据,且 5 秒钟内未曾刷盘,需要立即刷盘。
这个日志组件的异步刷盘操作本质上其实就是一种分阶段提交。下面我们具体看看用生产者 - 消费者模式如何实现。在下面的示例代码中,可以通过调用 info()
和error()
方法写入日志,这两个方法都是创建了一个日志任务 LogMsg,并添加到阻塞队列中,调用 info()
和error()
方法的线程是生产者;而真正将日志写入文件的是消费者线程,在 Logger 这个类中,我们只创建了 1 个消费者线程,在这个消费者线程中,会根据刷盘规则执行刷盘操作,逻辑很简单,这里就不赘述了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
class Logger {
// 任务队列
final BlockingQueue<LogMsg> bq
= new BlockingQueue<>();
//flush 批量
static final int batchSize=500;
// 只需要一个线程写日志
ExecutorService es =
Executors.newFixedThreadPool(1);
// 启动写日志线程
void start(){
File file=File.createTempFile(
"foo", ".log");
final FileWriter writer=
new FileWriter(file);
this.es.execute(()->{
try {
// 未刷盘日志数量
int curIdx = 0;
long preFT=System.currentTimeMillis();
while (true) {
LogMsg log = bq.poll(
5, TimeUnit.SECONDS);
// 写日志
if (log != null) {
writer.write(log.toString());
++curIdx;
}
// 如果不存在未刷盘数据,则无需刷盘
if (curIdx <= 0) {
continue;
}
// 根据规则刷盘
if (log!=null && log.level==LEVEL.ERROR ||
curIdx == batchSize ||
System.currentTimeMillis()-preFT>5000){
writer.flush();
curIdx = 0;
preFT=System.currentTimeMillis();
}
}
}catch(Exception e){
e.printStackTrace();
} finally {
try {
writer.flush();
writer.close();
}catch(IOException e){
e.printStackTrace();
}
}
});
}
// 写 INFO 级别日志
void info(String msg) {
// 将队列放入一个 打印的 处理
bq.put(new LogMsg(
LEVEL.INFO, msg));
}
// 写 ERROR 级别日志
void error(String msg) {
bq.put(new LogMsg(
LEVEL.ERROR, msg));
}
}
// 日志级别
enum LEVEL {
INFO, ERROR
}
class LogMsg {
LEVEL level;
String msg;
// 省略构造函数实现
LogMsg(LEVEL lvl, String msg){}
// 省略 toString() 实现
String toString(){}
}
|
总结
Java 语言提供的线程池本身就是一种生产者 - 消费者模式的实现,但是线程池中的线程每次只能从任务队列中消费一个任务来执行,对于大部分并发场景这种策略都没有问题。但是有些场景还是需要自己来实现,例如需要批量执行以及分阶段提交的场景。
生产者 - 消费者模式在分布式计算中的应用也非常广泛。在分布式场景下,你可以借助分布式消息队列(MQ)来实现生产者 - 消费者模式。MQ 一般都会支持两种消息模型,一种是点对点模型,一种是发布订阅模型。这两种模型的区别在于,点对点模型里一个消息只会被一个消费者消费,和 Java 的线程池非常类似(Java 线程池的任务也只会被一个线程执行);而发布订阅模型里一个消息会被多个消费者消费,本质上是一种消息的广播,在多线程编程领域,你可以结合观察者模式实现广播功能。
其他问题
在日志组件异步刷盘的示例代码中,写日志的线程以 while(true){}
的方式执行,你有哪些办法可以优雅地终止这个线程呢?
1
2
3
4
5
6
7
8
9
10
|
this.writer.execute(()->{
try {
// 未刷盘日志数量
int curIdx = 0;
long preFT=System.currentTimeMillis();
while (true) {
......
}
} catch(Exception e) {}
}
|
- 使用线程池的shutdown或者shutdownNow关闭线程池
- while循环条件设置为一个volatile boolean变量
- 可以使用interrupt,但是线程是线程池管理的,没有消费者线程的引用中断不了