21-2 Java 线程的协作

多线程协作的基本机制 wait/notify

多线程之间除了竞争访问同一个资源外,也经常需要相互协作,怎么协作呢?本节就来介绍Java中多线程协作的基本机制 wait/notify

wait 实际上做了什么呢?它在等待什么?之前我们说过,每个对象都有一把锁和等待队列,一个线程在进入 synchronized 代码块时,会尝试获取锁,如果获取不到则会把当前线程加入等待队列中,其实,除了用于锁的等待队列,每个对象还有另一个等待队列,表示条件队列,该队列用于线程间的协作。

notify 做的事情就是从条件队列中选一个线程,将其从队列中移除并唤醒,notify 和 notifyAll 的区别是,它会移除条件队列中所有的线程并全部唤醒。

wait/notify 方法只能在 synchronized 代码块内被调用,如果调用 wait/notify 方法时,当前线程没有持有对象锁,会抛出异常 java.lang.IllegalMonitor-StateException。

同时开始

每个线程在开始前进行 wait,然后主线程通过 notifyAll 唤醒所有。

同时结束

我们之前通过主线程等待子线程使用的是 join,但是 join 有时比较麻烦,需要主线程逐一等待每个子线程。

主线程先等待,只有等到所有子线程结束。然后一个条件,必须先 wait,再 notify。

异步结果

一种常见的模式是异步调用,异步调用返回一个一般称为 Future 的对象,通过它可以获得最终的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package qy.basic.ch21;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;

/**
* 线程池自定义任务 简单 demo
*/
public class Ch21_10_Executor {

interface MyFuture<V> {
// 阻塞直到线程运行结束
V get() throws InterruptedException, ExecutionException;
}

static class MyExecutor {
// 它封装了创建子线程,同步获取结果的过程,它会创建一个执行子线程
public <V> MyFuture<V> submit(final Callable<V> callable) {
Object lock = new Object();
ExecutorThread<V> thread = new ExecutorThread<>(callable, lock);
thread.start();

MyFuture<V> future = new MyFuture<V>() {

@Override
public V get() throws InterruptedException, ExecutionException {
synchronized (lock) {
while(!thread.isDone) {
lock.wait();
}
if (thread.getException() != null) {
throw new ExecutionException(thread.getException());
}
V v = thread.getResult();
return v;
}
}
};
return future;
}
}

static class ExecutorThread<V> extends Thread {
private V result;
private Exception exception;
boolean isDone = false;
private Callable<V> callable;
private Object lock;

public ExecutorThread(Callable<V> callable, Object lock) {
this.callable = callable;
this.lock = lock;
}

@Override
public void run() {
try {
result = callable.call();
} catch (Exception e) {
exception = e;
} finally {
synchronized (lock) {
isDone = true;
lock.notifyAll();
}
}
}

public V getResult() {
return result;
}

public Exception getException() {
return exception;
}

public boolean isDone() {
return isDone;
}
}

public static void main(String[] args) throws Exception {
MyExecutor executor = new MyExecutor();
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return "hello MyExecutor!";
}
};
MyFuture<String> future = executor.submit(callable);
// 获取异步调取结果
String result = future.get();
System.out.println("result = " + result);
}

}

集合点

各个线程先是分头行动,各自到达一个集合点,在集合点需要集齐所有线程,交换数据,然后再进行下一步动作。

线程中断

stop 方法看上去就可以停止线程,但这个方法被标记为了过时,简单地说,我们不应该使用它,可以忽略它。

在 Java 中,停止一个线程的主要机制是中断,中断并不是强迫终止一个线程,它是一种协作机制,是给线程传递一个取消信号,但是由线程来决定如何以及何时退出。

  • void interrupt()方法 :中断线程,例如,当线程A运行时,线程B可以调用线程A的interrupt()方法来设置线程A的中断标志为 true 并立即返回。设置标志仅仅是设置标志,线程A实际并没有被中断,它会继续往下执行。如果线程处于了阻塞状态(如线程调用了thread.sleep、thread.join、thread.wait、1.5中的 condition.await、以及可中断的通道上的 I/O 操作方法后可进入阻塞状态),这时候若线程B调用线程A的interrupt()方法,线程A在检查中断标示时如果发现中断标示为true,则会在这些阻塞方法(sleep、join、wait、1.5中的condition.await 及可中断的通道上的 I/O 操作方法)调用处抛出 InterruptedException 异常。并且在抛出异常后立即将线程的中断标示位清除,即重新设置为 false。抛出异常是为了线程从阻塞状态醒过来,并在结束线程前让程序员有足够的时间来处理中断请求

  • boolean isInterrupted()方法:检测当前线程是否被中断,如果是返回 true,否则返回 false。并不清除中断标志位。

1
2
3
public boolean isInterrupted() {
return isInterrupted(false);
}
  • boolean interrupted()方法:检测当前线程是否被中断,如果是返回 true,否则返回 false。与 isInterrupted 不同的是,该方法如果发现当前线程被中断,则会清除中断标志,并且该方法是 static 静态方法,可以通过 Thread 类直接调用。另外从下面的代码可以知道,在 interrupted()内部是获取当前调用线程的中断标志而不是调用 interrupted()方法的实例对象的中断标志。
1
2
3
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}

线程对中断的反应

interrupt()对线程的影响与线程的状态和在进行的IO操作有关。我们主要考虑线程的状态,IO操作的影响和具体IO以及操作系统有关,我们就不讨论了。线程状态有:

❑ RUNNABLE:线程在运行或具备运行条件只是在等待操作系统调度。
❑ WAITING/TIMED_WAITING:线程在等待某个条件或超时。
❑ BLOCKED:线程在等待锁,试图进入同步块。
❑ NEW/TERMINATED:线程还未启动或已结束。

RUNNABLE:如果线程在运行中,且没有执行IO操作,interrupt()只是会设置线程的中断标志位,没有任何其他作用。

WAITING/TIMED_WAITING:线程调用join/wait/sleep方法会进入 WAITING 或 TIMED_WAITING状态,在这些状态时,对线程对象调用interrupt()会使得该线程抛出InterruptedException。需要注意的是,抛出异常后,中断标志位会被清空,而不是被设置。

捕获到 InterruptedException,通常表示希望结束该线程,线程大致有两种处理方式:
1)向上传递该异常,这使得该方法也变成了一个可中断的方法,需要调用者进行处理;
2)有些情况,不能向上传递异常,比如 Thread 的 run 方法,它的声明是固定的,不能抛出任何受检异常,这时,应该捕获异常,进行合适的清理操作,清理后,一般应该调用 Thread 的 interrupt 方法设置中断标志位,使得其他代码有办法知道它发生了中断。

BLOCKED:如果线程在等待锁,对线程对象调用interrupt()只是会设置线程的中断标志位,线程依然会处于BLOCKED状态,也就是说,interrupt()并不能使一个在等待锁的线程真正“中断”。

NEW/TERMINATED:如果线程尚未启动(NEW),或者已经结束(TERMINATED),则调用interrupt()对它没有任何效果,中断标志位也不会被设置。

参考