Java 多线程工具

1. 并发工具

对于很多的多线程问题,我们不需要再去实现一遍底层的锁和同步机制了。
对于一般的应用向问题,应优先采用并发工具

Executor 和 Task 优先于线程(Runnable)
并发工具优先于 wait()notify()
—— Effective Java Second Edition

1.1 阻塞队列

对于许多线程问题,可以通过使用一个或者多个队列来将其形式化。可以通过 Producesor 将任务加入队列,然后由 Comsumor 来将任务取出然后进行处理的方式来实现。

Java 的阻塞队列自带了阻塞特性,不再需要显式的同步

1.1.1 API

这里只介绍阻塞队列的阻塞方法,实际上阻塞队列也包含一些非阻塞的方法

方法正常动作特殊情况下的动作
put添加一个元素如果队列满,则阻塞
take移出并返回头元素如果队列空,则阻塞
offer添加一个元素,并返回 true如果队列满,则返回 false
poll移出并返回队列的头元素如果队列空,则返回 null
peek返回队列的头元素(不移出如果队列空,则返回 null

注意:

  1. offer 、peek、poll 在特殊情况下并不阻塞,但是它们有对应的超时版本
  1. 由于 peak poll 带有 返回 null 的属性,所以不能往这样的队列插入 null
  2. 这个队列还具有 add()remove 方法,但是它们在特殊情况下会抛出异常,所以在多线程程序中不要使用这样的方法。

Java 准备了多种实现形式的阻塞队列,包括链表、双端链表、数组等实现,甚至包括优先队列。

同时,Java 1.7 还提供了 TransferQueue 接口,这个接口允许生产者线程等待,直到消费者线程准备就绪。

1.1.2 例子

下面是一个使用阻塞队列来管理多线程关系的例子:
即,生产者线程将元素加入到队列中,消费者线程将元素取出进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class BlockingQueueExample {
public static void main(String[] args) throws Exception {
BlockingQueue bq = new ArrayBlockingQueue(1000);
Producer producer = new Producer(bq);
Consumer consumer = new Consumer(bq);

new Thread(producer).start();
new Thread(consumer).start();

Thread.sleep(4000);
}
}


/**
* Producer generate the sum.
* And add it into the queue
*/
public class Producer implements Runnable {
private BlockingQueue bq = null;

public Producer(BlockingQueue queue) {
this.setBlockingQueue(queue);
}

// The blocking queue has a internal synchronize
// The delay of each end of the addition will show this
public void run() {
Random rand = new Random();
int res = 0;
try {
res = Addition(rand.nextInt(100), rand.nextInt(50));
System.out.println("Produced: " + res);
bq.put(res);
Thread.sleep(1000);

res = Addition(rand.nextInt(100), rand.nextInt(50));
System.out.println("Produced: " + res);
bq.put(res);
Thread.sleep(1000);

res = Addition(rand.nextInt(100), rand.nextInt(50));
System.out.println("Produced: " + res);
bq.put(res);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void setBlockingQueue(BlockingQueue bq) {
this.bq = bq;
}

public int Addition(int x, int y) {
int result = 0;
result = x + y;
return result;
}
}

/**
* Comsumer take the result from the queue.
* And print it out to the output
*/
public class Consumer implements Runnable {
protected BlockingQueue queue = null;

public Consumer(BlockingQueue queue) {
this.queue = queue;
}

public void run() {
try {
System.out.println("Consumed: " + queue.take());
System.out.println("Consumed: " + queue.take());
System.out.println("Consumed: " + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

The output:

1
2
3
4
5
6
Produced: 93
Consumed: 93
Produced: 69
Consumed: 69
Produced: 76
Consumed: 76

1.2 线程安全的集合

java.util.concurrent 包提供了许多线程安全的集合。
主要用于多线程并发修改一个数据结构的并发问题。
包括 哈希表、有序集和队列等

一般来说,线程安全的集合要比一般的集合更高效

在较早的 Java 版本,曾有“同步包装器”使得一般的集合类型变为同步的,但是现在已经不推荐使用了,最好使用 java.util.concurrent 包中的集合

2. CallableFuture

2.1 Callable

Callable 是一个带返回值Runnable,具有泛型特性。
例如 Callable<Integer> 表示一个最终返回 Interger 的异步计算

2.2 Future

Future 保存异步任务的结果,可以将其启动然后交给一个线程。
所有者在任务执行完毕后,可以通过 get() 方法获得结果。

Future 具有以下方法

1
2
3
4
5
6
7
public interface Future<V> {
V get() throws ...;
V get(long timeout, TimeUnit unit) throws ...;
void cancel(boolean mayInterrupt);
boolean isCancelled();
boolean isDone();
}

第一个 get() 调用直到计算完成前会被阻塞
如果任务完成前第二个 get() 超时,则抛出 TimeoutException
如果线程被中断,则都抛出 InterruptedException
如果任务已经完成,那么 get() 立即返回

可以使用 cancel() 方法来中断任务,
如果任务没有开始,则它将被取消而不会再运行,
如果任务已经在运行,那么则由 mayInterrupt 参数来决定是否中断任务
如果任务已经被取消或者已经完成,那么返回 false,其他情况返回 true

注意,此方法一旦返回,则 isDone() 永远返回 true

2.3 FutureTask

Java 实现了 FutureTask 包装器,它是一个类,同时实现了 RunnableFuture 接口
它接受一个 Callable 接口作为构建器参数,主要用于将 Callable 转换为 RunnalbeFuture

可以如下使用

1
2
3
4
5
6
Callable<Integer> myComputation = ...;
FutureTask<Integer> task = new FutureTask<Integer>(myComputation);
Thread t = new Thread(task) // It's a Runnale
t.start();
...
Integer result = task.get(); // It's a Future

3. 执行器(Executor)

如果你需要做一些重复性较高的异步任务,或者创建大量的生命期很短的线程,那么就应该用线程池来管理。
实际上,为了提高效率,执行任何的并发任务,都应该优先考虑 Execulator 和 Task

Execulator 和 Task 优先于线程(Thread)
—— Effective Java Second Edition

在这里,并发的最小单位升级为 ExecutorTask
所谓的 Task 就是用户构建的 Runnable 或者 Callable 对象;
这也是为什么要优先采用 Runnable 的原因

3.1 基本使用

基本的使用步骤如下:

  1. 使用 Executors 的静态方法构建线程池,或者叫 ExecutorService
  2. 调用 execute()submit() 提交 RunnableCallable 对象
  3. 当不在提交任务时,调用 shutdown()

注意,还有一个 execute() 方法执行 submit() 的效果。
它们的主要区别在于,
execute() 会触发未捕获处理器,从而向 System.err 输出错误信息;
submit() 会抛出 ExecutionException,可以使用 getCause() 获取出错信息

另外, submit() 返回的是 Future 对象,可以通过它取消特定任务。
由此,如果使用 Callable 那么使用 submit()
如果使用 Runnable 那么使用 execute()

例子:

1
2
3
4
5
6
7
8
9
ExecutorService executorService = Executors.newFixedThreadPool(10);

executorService.execute(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});

executorService.shutdown();

3.2 ScheduledExecutorService 预定执行

该类是 ExecutorService 的子类,用于构建预订性重复性、周期性 的任务

可以指定任务只运行一次,也可以指定任务的运行周期

3.3 控制任务组

使用 ExecutorService 的另一个重要原因就是可以实现控制一组相关任务。
特别是在采用分治策略的算法中常常能用到。

例如,使用对一个大整数进行因式分解,那么我们可以将整个过程分成很多很小的过程,当小任务全部解决完毕时,整数的因式分解也就完毕了。

或者,我们可以用它来提交很多对于同一个问题的不同解决方案,如果有任何一个解决方案得出答案,那整个任务就可以停止了。

对于以上两种情况,使用 ExecutorService 分别有两种方法进行对应:

  1. invokeAll(),这个方法提交所有的 Callable 到一个集合中,并返回一个 Future 对象,代表所有任务解决结果
  2. invokeAny(),这个方法提交所有的 Callable 到一个集合中,并返回一个 Future 对象,代表某一个任务的解决结果

例子:

1
2
3
4
5
6
// invokeAll -- Return a List of Future
List<Callable<T>> task = ...;
List<Future<T>> results = executor.invokeAll(task);

// invokeAny -- Return only one Future
Future<T> resultOfInvokeAny = executor.invokeAny(task);

可以使用 ExecutorCompletionService 来对 invokeAll() 得到的结果集进行排列处理

1
2
3
4
5
6
7
8
9
10
// executor is a ExecutorService
ExecutorCompletionService service = new ExecutroCompletionService(executor);

for (Callable<T> task : tasks) {
service.submit(task);
}

for (int i = 0; i < tasks.size(); i++) {
processFurther(service.take().get());
}

3.4 Fork-Join 框架

对于多线程处理的分治策略的任务, Java 实现了一种 Fork-Join 框架来更好的实现这种任务流程。

分治的很常见的实现方式是递归实现,这个框架也使用了递归的思路

使用步骤:

  1. 提供一个扩展了 RecursiveTask<T> 或者 RecursiveAction 的类
  2. Override compute() 方法,在其中调用子任务并将其合并

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Counter extends RecursiveTask<Integer> {
...
@Override
protected Integer computer() {
if (to - from < THRESHOLD) {
// solve problem directly
}
else {
int mid = from + (to - from) / 2;
// Recursive solve left
Counter first = new Counter(values, form, mid, filter);
// Recursive solve right
Counter second = new Counter(values, mid, to, filter);
invokeAll(first, second); // Add both to executor
return first.join() + second.join(); // bind the solution
}
}
}

4. 同步器(Synchronizer)

同步器是并发工具的一种,一些使线程能够等待另一个线程的对象,允许它们协调动作。

同步器
作用何时使用
CyclicBarrier (不常用)允许线程集等待直到其中预定数目的线程到达一个公共障栅(barrier),然后可以选择执行一个处理 barrier 的动作当大量的线程需要在它们的结果可用之前完成时
CountDownLatch (常用)允许线程集等待直到计数器为 0当线程需要等待事件发生(才允许执行时)
Exchanger(不常用)允许两个线程在要交换的对象准备好时交换对象当两个线程工作在同一个数据结构的两个实例上时
Semaphore(常用)允许线程集等待知道它被允许继续执行为止限制访问资源的线程总数
SynchronousQueue允许一个线程将对象交给另一个线程在没有显式同步的情况下,当两个线程准备好将一个对象传递到另一个时

注意 CountDownLatch,这个类用于让某些线程等待其他线程
它是唯一一个带有 int 构造参数的同步器,用于指定等待的并发线程的个数

形象来说,就是一个红绿灯,直到倒计时完毕,线程才可以运行
下面是一个简单的多线程计时的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* A simple timing concurrent execution.
* The timer will not start until all the worker thread are ready.
* And when the last worker thread done, the timer stop
*/
public static long time(Executor executor, int concurrency, final Runnalbe action) throws InterruptedException {
final CountDownLatch ready = new CounDownLatch(concurrency);
final CountDownLatch start = new CounDownLatch(1);
final CountDownLatch done = new CounDownLatch(concurrency);

for (int i = 0; i < concurrency; i++) {
executor.execute(new Runnable() {
// This is the worker thread
public void run() {
ready.countDown(); // Tell the timer worker is ready
try {
start.await(); // Worker stuck at start point

// Because of blocking,
// this statement will not run
// until the start count down reach 0
action.run();
} catch (InterruptedException) {
Thread.currentThread().interrupt();
} finally {
done.countDown(); // Tell the timer worker is done
}
}
});
}

// This is the timer thread
ready.await(); // Wait for all the workers are done
long startNanos = System.nanoTime();
start.countDown(); // Let worker thread off!!
done.await(); // Wait for worker done.
return System.nanoTime() - startNanos;
}

线程中调用锁存器的 await() 方法可以阻塞当前线程
当锁存器的计数器为 0 时,所有的被该锁存器阻塞的线程即刻执行

锁存器是共享的,在任何线程中都可被更改
一旦归 0,障碍即刻被放弃