很多开发者谈到Java多线程开发,仅仅停留在new Thread(…).start()或直接使用Executor框架这个层面,对于线程的管理和控制却不够深入,通过读《Java并发编程实践》了解到了很多不为我知但又非常重要的细节,今日整理如下。

不应用线程池的缺点

有些开发者图省事,遇到需要多线程处理的地方,直接new Thread(…).start(),对于一般场景是没问题的,但如果是在并发请求很高的情况下,就会有些隐患:

  • 新建线程的开销。线程虽然比进程要轻量许多,但对于JVM来说,新建一个线程的代价还是挺大的,决不同于新建一个对象
  • 资源消耗量。没有一个池来限制线程的数量,会导致线程的数量直接取决于应用的并发量,这样有潜在的线程数据巨大的可能,那么资源消耗量将是巨大的
  • 稳定性。当线程数量超过系统资源所能承受的程度,稳定性就会成问题

制定执行策略

在每个需要多线程处理的地方,不管并发量有多大,需要考虑线程的执行策略

  • 任务以什么顺序执行
  • 可以有多少个任何并发执行
  • 可以有多少个任务进入等待执行队列
  • 系统过载的时候,应该放弃哪些任务?如何通知到应用程序?
  • 一个任务的执行前后应该做什么处理

线程池的类型

不管是通过Executors创建线程池,还是通过Spring来管理,都得清楚知道有哪几种线程池:

  • FixedThreadPool:定长线程池,提交任务时创建线程,直到池的最大容量,如果有线程非预期结束,会补充新线程
  • CachedThreadPool:可变线程池,它犹如一个弹簧,如果没有任务需求时,它回收空闲线程,如果需求增加,则按需增加线程,不对池的大小做限制
  • SingleThreadExecutor:单线程。处理不过来的任务会进入FIFO队列等待执行
  • SecheduledThreadPool:周期性线程池。支持执行周期性线程任务

其实,这些不同类型的线程池都是通过构建一个ThreadPoolExecutor来完成的,所不同的是corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory这么几个参数。具体可以参见JDK DOC。

线程池饱和策略

由以上线程池类型可知,除了CachedThreadPool其他线程池都有饱和的可能,当饱和以后就需要相应的策略处理请求线程的任务,ThreadPoolExecutor采取的方式通过队列来存储这些任务,当然会根据池类型不同选择不同的队列,比如FixedThreadPool和SingleThreadExecutor默认采用的是无限长度的LinkedBlockingQueue。但从系统可控性讲,最好的做法是使用定长的ArrayBlockingQueue或有限的LinkedBlockingQueue,并且当达到上限时通过ThreadPoolExecutor.setRejectedExecutionHandler方法设置一个拒绝任务的策略,JDK提供了AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy几种策略,具体差异可见JDK DOC

线程无依赖性

多线程任务设计上尽量使得各任务是独立无依赖的,所谓依赖性可两个方面:

  • 线程之间的依赖性。如果线程有依赖可能会造成死锁或饥饿
  • 调用者与线程的依赖性。调用者得监视线程的完成情况,影响可并发量

当然,在有些业务里确实需要一定的依赖性,比如调用者需要得到线程完成后结果,传统的Thread是不便完成的,因为run方法无返回值,只能通过一些共享的变量来传递结果,但在Executor框架里可以通过Future和Callable实现需要有返回值的任务,当然线程的异步性导致需要有相应机制来保证调用者能等待任务完成,关于Future和Callable的用法见下面的实例就一目了然了:

 

  1. public class FutureRenderer {  
  2.     private final ExecutorService executor = …;  
  3.     void renderPage(CharSequence source) {  
  4.         final List<ImageInfo> imageInfos = scanForImageInfo(source);  
  5.         Callable<List<ImageData>> task =  
  6.                 new Callable<List<ImageData>>() {  
  7.                     public List<ImageData> call() {  
  8.                         List<ImageData> result  
  9.                                 = new ArrayList<ImageData>();  
  10.                         for (ImageInfo imageInfo : imageInfos)  
  11.                             result.add(imageInfo.downloadImage());  
  12.                         return result;  
  13.                     }  
  14.                 };  
  15.         Future<List<ImageData>> future =  executor.submit(task);  
  16.         renderText(source);  
  17.         try {  
  18.             List<ImageData> imageData =  future.get();  
  19.             for (ImageData data : imageData)  
  20.                 renderImage(data);  
  21.         } catch (InterruptedException e) {  
  22.             // Re-assert the thread’s interrupted status  
  23.             Thread.currentThread().interrupt();  
  24.             // We don’t need the result, so cancel the task too  
  25.             future.cancel(true);  
  26.         } catch (ExecutionException e) {  
  27.             throw launderThrowable(e.getCause());  
  28.         }  
  29.     }  
  30. }  

 

以上代码关键在于List<ImageData> imageData = future.get();如果Callable类型的任务没有执行完时,调用者会阻塞等待。不过这样的方式还是得谨慎使用,很容易造成不良设计。另外对于这种需要等待的场景,就需要设置一个最大容忍时间timeout,设置方法可以在 future.get()加上timeout参数,或是再调用ExecutorService.invokeAll 加上timeout参数

线程的取消与关闭 

一般的情况下是让线程运行完成后自行关闭,但有些时候也会中途取消或关闭线程,比如以下情况:

  • 调用者强制取消。比如一个长时间运行的任务,用户点击”cancel”按钮强行取消
  • 限时任务
  • 发生不可处理的任务
  • 整个应用程序或服务的关闭

因此需要有相应的取消或关闭的方法和策略来控制线程,一般有以下方法:

1)通过变量标识来控制

这种方式比较老土,但使用得非常广泛,主要缺点是对有阻塞的操作控制不好,代码示例如下所示:

 

  1. public class PrimeGenerator implements Runnable {  
  2.      @GuardedBy(“this”)  
  3.      private final List<BigInteger> primes  
  4.              = new ArrayList<BigInteger>();  
  5.      private  volatile boolean cancelled;  
  6.      public void run() {  
  7.          BigInteger p = BigInteger.ONE;  
  8.          while (!cancelled ) {  
  9.              p = p.nextProbablePrime();  
  10.              synchronized (this) {  
  11.                  primes.add(p);  
  12.              }  
  13.          }  
  14.      }  
  15.      public void cancel() { cancelled = true;  }  
  16.      public synchronized List<BigInteger> get() {  
  17.          return new ArrayList<BigInteger>(primes);  
  18.      }  
  19. }  

 

2)中断

中断通常是实现取消最明智的选择,但线程自身需要支持中断处理,并且要处理好中断策略,一般响应中断的方式有两种:

  • 处理完中断清理后继续传递中断异常(InterruptedException)
  • 调用interrupt方法,使得上层能感知到中断异常

3) 取消不可中断阻塞

存在一些不可中断的阻塞,比如:

  • java.io和java.nio中同步读写IO
  • Selector的异步IO
  • 获取锁

对于这些线程的取消,则需要特定情况特定对待,比如对于socket阻塞,如果要安全取消,则需要调用socket.close()

4)JVM的关闭

如果有任务需要在JVM关闭之前做一些清理工作,而不是被JVM强硬关闭掉,可以使用JVM的钩子技术,其实JVM钩子也只是个很普通的技术,也就是用个map把一些需要JVM关闭前启动的任务保存下来,在JVM关闭过程中的某个环节来并发启动这些任务线程。具体使用示例如下:

 

  1. public void start() {  
  2.     Runtime.getRuntime().addShutdownHook(new Thread() {  
  3.         public void run() {  
  4.             try { LogService.this.stop(); }  
  5.             catch (InterruptedException ignored) {}  
  6.         }  
  7.     });  
  8. }