Thread 模式类比的是工厂里车间工人的工作模式。但其实在现实世界,工厂里还有一种流水线的工作模式,类比到编程领域,就是生产者 - 消费者模式

当然,除了在线程池中的应用,为了提升性能,并发编程领域很多地方也都用到了生产者 - 消费者模式,例如 Log4j2 中异步 Appender 内部也用到了生产者 - 消费者模式。所以今天我们就来深入地聊聊生产者 - 消费者模式,看看它具体有哪些优点,以及如何提升系统的性能。

img

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 任务队列
BlockingQueue<Task> bq=new
  LinkedBlockingQueue<>(2000);
// 启动 5 个消费者线程
// 执行批量任务  
void start() {
  ExecutorService es=xecutors
    .newFixedThreadPool(5);
  for (int i=0; i<5; i++) {
    es.execute(()->{
      try {
        while (true) {
          // 获取批量任务
          List<Task> ts=pollTasks();
          // 执行批量任务
          execTasks(ts);
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    });
  }
}
// 从任务队列中获取批量任务
List<Task> pollTasks() 
    throws InterruptedException{
  List<Task> ts=new LinkedList<>();
  // 阻塞式获取一条任务
  Task t = bq.take();
  while (t != null) {
    ts.add(t);
    // 非阻塞式获取一条任务
    t = bq.poll();
  }
  return ts;
}
// 批量执行任务
execTasks(List<Task> ts) {
  // 省略具体代码无数
}

支持分阶段提交以提升性能

利用生产者 - 消费者模式还可以轻松地支持一种分阶段提交的应用场景。我们知道写文件如果同步刷盘性能会很慢,所以对于不是很重要的数据,我们往往采用异步刷盘的方式。我曾经参与过一个项目,其中的日志组件是自己实现的,采用的就是异步刷盘方式,刷盘的时机是:

  1. ERROR 级别的日志需要立即刷盘
  2. 数据积累到 500 条需要立即刷盘
  3. 存在未刷盘数据,且 5 秒钟内未曾刷盘,需要立即刷盘。

这个日志组件的异步刷盘操作本质上其实就是一种分阶段提交。下面我们具体看看用生产者 - 消费者模式如何实现。在下面的示例代码中,可以通过调用 info()error() 方法写入日志,这两个方法都是创建了一个日志任务 LogMsg,并添加到阻塞队列中,调用 info()error() 方法的线程是生产者;而真正将日志写入文件的是消费者线程,在 Logger 这个类中,我们只创建了 1 个消费者线程,在这个消费者线程中,会根据刷盘规则执行刷盘操作,逻辑很简单,这里就不赘述了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class Logger {
  // 任务队列  
  final BlockingQueue<LogMsg> bq
    = new BlockingQueue<>();
  //flush 批量  
  static final int batchSize=500;
  // 只需要一个线程写日志
  ExecutorService es = 
    Executors.newFixedThreadPool(1);
  // 启动写日志线程
  void start(){
    File file=File.createTempFile(
      "foo", ".log");
    final FileWriter writer=
      new FileWriter(file);
    this.es.execute(()->{
      try {
        // 未刷盘日志数量
        int curIdx = 0;
        long preFT=System.currentTimeMillis();
        while (true) {
          LogMsg log = bq.poll(
            5, TimeUnit.SECONDS);
          // 写日志
          if (log != null) {
            writer.write(log.toString());
            ++curIdx;
          }
          // 如果不存在未刷盘数据,则无需刷盘
          if (curIdx <= 0) {
            continue;
          }
          // 根据规则刷盘
          if (log!=null && log.level==LEVEL.ERROR ||
              curIdx == batchSize ||
              System.currentTimeMillis()-preFT>5000){
            writer.flush();
            curIdx = 0;
            preFT=System.currentTimeMillis();
          }
        }
      }catch(Exception e){
        e.printStackTrace();
      } finally {
        try {
          writer.flush();
          writer.close();
        }catch(IOException e){
          e.printStackTrace();
        }
      }
    });  
  }
  // 写 INFO 级别日志
  void info(String msg) {
   // 将队列放入一个 打印的 处理
    bq.put(new LogMsg(
      LEVEL.INFO, msg));
  }
  // 写 ERROR 级别日志
  void error(String msg) {
    bq.put(new LogMsg(
      LEVEL.ERROR, msg));
  }
}
// 日志级别
enum LEVEL {
  INFO, ERROR
}
class LogMsg {
  LEVEL level;
  String msg;
  // 省略构造函数实现
  LogMsg(LEVEL lvl, String msg){}
  // 省略 toString() 实现
  String toString(){}
}

总结

Java 语言提供的线程池本身就是一种生产者 - 消费者模式的实现,但是线程池中的线程每次只能从任务队列中消费一个任务来执行,对于大部分并发场景这种策略都没有问题。但是有些场景还是需要自己来实现,例如需要批量执行以及分阶段提交的场景。

生产者 - 消费者模式在分布式计算中的应用也非常广泛。在分布式场景下,你可以借助分布式消息队列(MQ)来实现生产者 - 消费者模式。MQ 一般都会支持两种消息模型,一种是点对点模型,一种是发布订阅模型。这两种模型的区别在于,点对点模型里一个消息只会被一个消费者消费,和 Java 的线程池非常类似(Java 线程池的任务也只会被一个线程执行);而发布订阅模型里一个消息会被多个消费者消费,本质上是一种消息的广播,在多线程编程领域,你可以结合观察者模式实现广播功能。

其他问题

在日志组件异步刷盘的示例代码中,写日志的线程以 while(true){} 的方式执行,你有哪些办法可以优雅地终止这个线程呢?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
this.writer.execute(()->{
  try {
    // 未刷盘日志数量
    int curIdx = 0;
    long preFT=System.currentTimeMillis();
    while (true) {
    ......
    }
  } catch(Exception e) {}
}    
  1. 使用线程池的shutdown或者shutdownNow关闭线程池
  2. while循环条件设置为一个volatile boolean变量
  3. 可以使用interrupt,但是线程是线程池管理的,没有消费者线程的引用中断不了