如何理解 Thread-Per-Message 模式

现实世界里,很多事情我们都需要委托他人办理,一方面受限于我们的能力,总有很多搞不定的事,比如教育小朋友,搞不定怎么办呢?只能委托学校老师了;另一方面受限于我们的时间,比如忙着写 Bug,哪有时间买别墅呢?只能委托房产中介了。委托他人代办有一个非常大的好处,那就是可以专心做自己的事了。

在编程领域也有很多类似的需求,比如写一个 HTTP Server,很显然只能在主线程中接收请求,而不能处理 HTTP 请求,因为如果在主线程中处理 HTTP 请求的话,那同一时间只能处理一个请求,太慢了!怎么办呢?可以利用代办的思路,创建一个子线程,委托子线程去处理 HTTP 请求。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
final ServerSocketChannel ssc = 
  ServerSocketChannel.open().bind(
    new InetSocketAddress(8080));
// 处理请求    
try {
  while (true) {
    // 接收请求
    SocketChannel sc = ssc.accept();
    // 每个请求都创建一个线程
    new Thread(()->{
      try {
        // 读 Socket
        ByteBuffer rb = ByteBuffer
          .allocateDirect(1024);
        sc.read(rb);
        // 模拟处理请求
        Thread.sleep(2000);
        // 写 Socket
        ByteBuffer wb = 
          (ByteBuffer)rb.flip();
        sc.write(wb);
        // 关闭 Socket
        sc.close();
      }catch(Exception e){
        throw new UncheckedIOException(e);
      }
    }).start();
  }
} finally {
  ssc.close();
}   

于是,你开始质疑 Thread-Per-Message 模式,而且开始重新思索解决方案,这时候很可能你会想到 Java 提供的线程池。你的这个思路没有问题,但是引入线程池难免会增加复杂度。其实你完全可以换一个角度来思考这个问题,语言、工具、框架本身应该是帮助我们更敏捷地实现方案的,而不是用来否定方案的,Thread-Per-Message 模式作为一种最简单的分工方案,Java 语言支持不了,显然是 Java 语言本身的问题。

Java 语言里,Java 线程是和操作系统线程一一对应的,这种做法本质上是将 Java 线程的调度权完全委托给操作系统,而操作系统在这方面非常成熟,所以这种做法的好处是稳定、可靠,但是也继承了操作系统线程的缺点创建成本高。为了解决这个缺点,Java 并发包里提供了线程池等工具类。这个思路在很长一段时间里都是很稳妥的方案,但是这个方案并不是唯一的方案。

业界还有另外一种方案,叫做轻量级线程。这个方案在 Java 领域知名度并不高,但是在其他编程语言里却叫得很响,例如 Go 语言、Lua 语言里的协程,本质上就是一种轻量级的线程。轻量级的线程,创建的成本很低,基本上和创建一个普通对象的成本相似;并且创建的速度和内存占用相比操作系统线程至少有一个数量级的提升,所以基于轻量级线程实现 Thread-Per-Message 模式就完全没有问题了。

Java 语言目前也已经意识到轻量级线程的重要性了,OpenJDK 有个 Loom 项目,就是要解决 Java 语言的轻量级线程问题,在这个项目中,轻量级线程被叫做Fiber。下面我们就来看看基于 Fiber 如何实现 Thread-Per-Message 模式。

Loom 项目在设计轻量级线程时,充分考量了当前 Java 线程的使用方式,采取的是尽量兼容的态度,所以使用上还是挺简单的。用 Fiber 实现 echo 服务的示例代码如下所示,对比 Thread 的实现,你会发现改动量非常小,只需要把 new Thread(()->{…}).start() 换成 Fiber.schedule(()->{}) 就可以了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
final ServerSocketChannel ssc = 
  ServerSocketChannel.open().bind(
    new InetSocketAddress(8080));
// 处理请求
try{
  while (true) {
    // 接收请求
    final SocketChannel sc = 
      serverSocketChannel.accept();
      // 开启协程处理
    Fiber.schedule(()->{
      try {
        // 读 Socket
        ByteBuffer rb = ByteBuffer
          .allocateDirect(1024);
        sc.read(rb);
        // 模拟处理请求
        LockSupport.parkNanos(2000*1000000);
        // 写 Socket
        ByteBuffer wb = 
          (ByteBuffer)rb.flip()
        sc.write(wb);
        // 关闭 Socket
        sc.close();
      } catch(Exception e){
        throw new UncheckedIOException(e);
      }
    });
  }//while
}finally{
  ssc.close();
}

Thread-Per-Message 模式在 Java 领域并不是那么知名,根本原因在于 Java 语言里的线程是一个重量级的对象,为每一个任务创建一个线程成本太高,尤其是在高并发领域,基本就不具备可行性。不过这个背景条件目前正在发生巨变,Java 语言未来一定会提供轻量级线程,这样基于轻量级线程实现 Thread-Per-Message 模式就是一个非常靠谱的选择。

当然,对于一些并发度没那么高的异步场景,例如定时任务,采用 Thread-Per-Message 模式是完全没有问题的。实际工作中,我就见过完全基于 Thread-Per-Message 模式实现的分布式调度框架,这个框架为每个定时任务都分配了一个独立的线程。

问题引出

使用 Thread-Per-Message 模式会为每一个任务都创建一个线程,在高并发场景中,很容易导致应用 OOM,那有什么办法可以快速解决呢?

  • 快速解决,就只能改jvm内存配置,增大jvm新生代的大小,长期解决,引入NIO或AIO,netty 就是这么干的

参考 loom项目