Single Threaded Execution模式

所谓Single Threaded Execution模式,意思是“单一线程执行”。该模式用于设置限制,以确保同一时间只能让一个线程执行处理。

Single Threaded Execution模式中,存在着一种SharedResource角色。SharedResource角色是会被多个线程同时访问的类,类会包含多个方法。但这些方法主要分为如下的两类:

  1. safeMethod:多个线程同时调用也不会发生问题的方法。
  2. unsafeMethod:多个线程同时调用会发生问题,因此必须加以保护的方法。

safeMethod方法,无需特别关注,因为多个线程同时调用也不会发生问题。

unsafeMethod,在Single Threaded Execution模式中,保证了同一时刻只能被一个线程执行。常用的控制方法为使用synchronized关键字对方法加以修饰。

当SharedResource角色的内部状态会发生变化,并且需要确保安全性的时候,我们就可以使用Single Threaded Execution模式来加以确保。

如下述的Counter类,getValue方法即为safeMethod,而increase方法为unsafeMethod,因为value++操作为非原子操作,在多个线程的并发执行下会发生数据争用问题,导致获取到错误的值。我们可以使用synchronized关键字修饰increase方法,使其变为临界区,同一时刻只允许一个线程进入临界区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Counter {
private int value;
public Counter(int initial) {
this.value = initial;
}

public synchronized increase() {
this.value++;
}

public int getValue() {
return this.value;
}
}

除使用synchronized关键字之外,还可以使用jdk中提供的ReentrantLock,在此不再赘述。

使用Singled Threaded Execution模式的时候,需要注意防范可能的死锁问题。当存在如下的条件时,就会发生死锁。

  1. 存在多个SharedResource。
  2. 线程在持有某个SharedResource的锁的同时,还等待获取其它的SharedResource的锁。
  3. 获取SharedResource角色的锁的顺序并不固定。

破坏以上的三个条件的任意一个,就可以避免死锁的发生。

  1. 将多个sharedResource整合成一个进行获取。将操作需要获取的锁的个数由多个优化为一个,破坏了上述条件的1和2。
  2. 等待获取SharedResource锁的时候加上超时机制。线程在持有某个SharedResource锁的同时,在等待获取其它的SharedResource的锁的时候加上一个超时条件。如在规定的时间内没有获取到需要的锁,则释放自己已经持有的锁。
  3. 固定多个SharedResource的获取顺序。在需要获取多个锁的时候,每次都按照相同的顺序来申请SharedResource的锁。

Immutable模式

Immutable模式中存在着状态不会发生改变的类,即Immutable类。在访问这些类的实例时,由于其状态不会变化,无需进行线程同步处理,程序为无锁操作,将具有较好的性能。

如下述的Person类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class final Person {
private final String name;
private final Integer age;

public Person(String name, Integer age) {
this.name = name;
this.age = age;
}

public String getName() {
return this.name;
}

public Integer getAge(){
return this.age;
}
}

如有多个线程,同时访问Person类的实例,可以看作存在着SharedResource。由于Person类的实例的状态无法发生改变,因此并不需要使用Single Threaded Pattern来进行确保。

jdk中的Stirng类、即使用了Immutable模式。

Guarded Suspension模式

Guarded Suspension模式,意思是“保护性暂停”模式。如果当前线程继续执行会发生问题的话,就先暂停当前线程的执行。

暂停了线程之后,什么时候再重新开始执行呢?当”某个条件”得到了满足之后就可以了。因此,Guarded Suspension模式也被称为多线程的if模式。

例如如下的类Requests类,其中包含了一个局部变量linkedlist和两个方法put和take,linkedlistput方法用于存储请求,put方法向linkedlist中提交请求,take方法从linkedlist中获取请求。不同的线程分别访问take方法和put方法,线程在调用take方法的时候,如果当前linkedlist为空,此时则应该暂停线程的执行,等待linkedlist非空再继续。此时,即可使用Guarded Suspension模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Requests {
private final LinkedList<String> linkedList = new LinkedList<>();
public synchronized take() throws InterruptedException {
while (linkedList.isEmpty()) {
wait();
}
return linkedList.pollFirst();
}
public synchronized put(String request) throws InterruptedException {
linkedList.offerLast(request);
notifyAll();
}
}

当线程A在调用get方法的时候,如当前linkedList为空,则线程A调用wait(),进入等待队列等待。当其它线程调用了notifyAll方法之后,线程A将会被从等待队列中唤醒,重新判断条件是否满足。

以上的Requests中,如果对内部存储的请求的容量有上限要求,那么在put方法中也应该像是在take方法中判断是否为空一样,加入对是否已满的判断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Requests {
private final LinkedList<String> linkedList = new LinkedList<>();
private final static Integer CAPACITY = 100;
public synchronized String take() throws InterruptedException {
while (linkedList.isEmpty()) {
wait();
}
String request = linkedList.pollFirst();
notifyAll();
return request;
}

public synchronized void put(String request) throws InterruptedException {
while (linkedList.size() == CAPACITY) {
wait();
}
linkedList.offerLast(request);
notifyAll();
}
}

因在Guarded Suspension模式的模板代码中,经常会出现wait方法,因此此模式也经常性的被称为Guarded wait模式。

Balking模式

在上面的Guarded Suspension模式中,当守护条件没有满足的时候,线程将会陷入阻塞,等待守护条件得到满足。在某些情况下,当发现条件没有满足的时候,我们希望立即返回,而不需要等待。这种模式称为Balking模式。Balking,即为停步返回的意思。

Balking模式的特点即是不进行循环等待。若守护条件不成立,则立刻返回并进入下一个操作。

懒加载模式下的单例模式的实现,可以看作是Balking模式。

1
2
3
4
5
6
7
8
9
10
public class Singleton {
private static Singleton instance;
public static synchronized Singleton getInstance() {
if (instance != null) {
return instance;
}
instance = new Singleton();
return instance;
}
}

在Guarded Suspension模式中,当守护条件不满足的情况下,线程会一直等待直到条件满足。在Balking模式下,当守护条件不满足的情况下,线程直接满足。在一直等待和直接返回之间,则存在着中间地带:等待守护条件满足或等待时间超时。可以称这种模式为带有超时时间的Guarded Suspension模式。那么这种模式如何来实现呢?

还是以上文例子中的Requests例子来描述。在下述的take方法中, 我们假定最长等待时间为60s,超过这个时间,直接返回null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Requests {
private final LinkedList<String> linkedList = new LinkedList<>();
private static final int TIME_OUT = 60 * 1000;
public synchronized String take() throws InterruptedException {
long start = System.currentTimeMillis();
while (linkedList.isEmpty()) {
long now = System.currentTimeMillis();
long left = TIME_OUT - (now - start);
if (left <= 0) return null;
wait();
}
return linkedList.pollFirst();
}
public synchronized void put(String request) throws InterruptedException {
linkedList.offerLast(request);
notifyAll();
}
}

Producer-Consumer模式

Producer-Consumer模式,即生产者-消费者模式。生产者-消费者模式在日常的开发中使用非常的广泛,java世界中使用非常广泛的线程池,其实质就是生产者-消费者模式的实现。还有我们经常使用的消费队列,如Kafka等,也是典型的生产者-消费者模式的应用。

使用Producer-Consumer模式的好处是:

  1. 生产者、消费者没有直接耦合,而是经过中间人,实现了解耦。单方面的崩溃不会带来连锁反应。
  2. 中间人的存在,消除了生产者生产消息和消费者消费消息之间速度的差异带来的问题。

在Producer-Consumer模式的实现中,经常会使用到Guarded Suspension模式。

一个典型的Producer-Consumer模式的实现如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class MessageStore {
private static final int CAPACITY_LIMIT = 1000;
private LinkedList<Request> linkedList = new ArrayList<>();

public synchronized put(Request request) throws InterruptedException {
while (linkedList.size() >= CAPACITY_LIMIT) {
wait();
}
linkedList.offerLast(request);
notifyAll();
}

public synchronized take() throws InterruptedException {
while (linkedList.isEmpty()) {
wait();
}
Request request = linkedList.pollFirst();
notifyAll();
return request;
}
}

pubic class Consumer {
private MessageStore messageStore;

public Consumer(MessageStore store) {
this.messageStore = store;
}

public void consume() {
try {
while (true) {
Request request = this.messageStore.take();
// process request
}
} catch (InterruptedException ex) {
logger.error(ex.getMessage(), ex);
}
}
}

public class Producer {
private MessageStore messageStore;

public Producer(MessageStore store) {
this.messageStore = store;
}

public void produce() {
try {
while (true) {
Request request = new Request();
this.messageStore.put(request);
}
} catch (InterruptedException ex) {
logger.error(ex.getMessage(), ex);
}
}
}

public class Request {

}

对于Producer生产的消息,通过中间人,以什么样的顺序传递给Consumer,存在着不同的策略:

  1. 先生产先消费。实践上经常使用队列来进行实现。上述示例代码,使用就是此策略。
  2. 后生产先消费。实践上经常使用栈来进行实现。
  3. 优先级高的先消费。实践上经常使用优先级队列来实现。

在生产中,MessageStore的实现自然不会如此。在jdk中的juc包下,有着众多的支持多线程的集合实现。如ArrayBlockingQueue、LinkedBlockingQueue。多加使用juc下的多线程队列,可以让代码实现上更加简单、高效。如使用LinkedBlockingQueue改写上述的MessageStore。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class MessageStore {
private static final int CAPACITY_LIMIT = 1000;
private LinkedBlockingQueue<Request> linkedList = new LinkedBlockingQueue<>(CAPACITY_1000);

public put(Request request) {
linkedList.put(request);
}

public take() {
Request request = linkedList.take();
return request;
}
}

Read-Write-Lock模式

Read-Write-Lock模式,即读写锁模式。多线程并发读写的情况下,区分读操作和写操作,降低锁的粒度,使用不同的锁来进行控制。在执行读操作的时候,可以申请共享性的读锁,允许多个线程并发的读;在执行写操作的时候,需要申请独占性的写锁。在申请共享读锁的时候,应优先排他写锁的申请。

一个简单的ReadWriteLock的实现示例代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ReadWriteLock {
private int readings = 0;
private int writings = 0;
private int waitings = 0;
public synchronized void readLock() throws InterruptedException {
while (writings > 0 || waitings > 0) {
wait();
}
readings++;
}

public synchronized void readUnlock() throws InterruptedException {
readings++;
nofifyAll();
}

public synchronized void writeLock() throws InterruptedException {
waitings++;
while (readings > 0 || writings > 0) {
wait();
}
waitings--;
writings++;
}

public synchronized void writeUnlock() {
writings--;
nofityAll();
}
}

以上的代码仅为示例,更加准确、高效的ReadWriteLock实现,请学习参阅jdk中的juc包下的ReentrantReadWriteLock实现。

还有一种类似的模式称为Copy-on-Write模式,即写时复制模式。此种模式下,在多线程并发读写的情况下,读操作由于数据的状态不会发生变化,因此无需执行加锁操作,写操作需要申请锁。在临界区内,执行数据的复制,复制完成后,执行数据的替换。因此,此种模式适用于读多写少的场景。同时,由于在写操作的时候会复制整个数据,因此数据不能太大,否则数据的复制将会花费较多的时间和内存的占用。

jdk中的juc包下,存在着CopyOnWriteArrayList和CopyOnWriteArraySet两个写时复制容器。这两个容器的读取操作时无锁实现,因此读的性能很高。

Thread-Per-Message模式

Thread-Per-Message模式,直译是指“每个消息一个线程”,即在消息到来时,为每个消息都单独创建一个线程来进行处理。

1
2
3
4
5
6
7
public void handle(final Message message) {
new Thread() {
public void run() {
// process message
}
}.start();
}

如上述代码,在handle函数中,每个消息都会被一个单独的Thread来进行处理。

此模式的好处是,消息调用线程和消息处理线程是相互独立的。调用线程无需等待消息处理完成,将消息处理委托给新创建的线程之后,就可以返回。这能显著提高响应性,降低延迟时间。很常用的应用场景包括GUI程序、web服务器程序。

此模式同时也存在着如下的特点:

  1. 一般情况下,无需线程处理的结果。如果需要等待线程处理的结果,可考虑使用Future模式。
  2. 消息处理的顺序,并不严格遵守消息到来的顺序。线程的执行顺序,和线程的创建顺序无关,由操作系统来进行调度。
  3. 消息到来说,都会创建新的线程。线程的创建需要一定的耗时。为了减少创建线程的时间,可以提前创建好线程,就是后面将要说到的Worker Thread模式。
  4. 消息过多时,创建的线程数存在过多,导致耗尽系统资源的风险。请衡量使用场景,必要时使用Worker Thread模式。

Worker Thread模式

Worker Thread模式,即工作者线程模式。与上面的Thread-Per-Message相比,它们的不同点如下:

  1. Worker Thread模式中线程是提前创建好的,Thread-Per-Message模式中线程在消息到来时创建。
  2. Worker Thread模式中线程和消息不是一一对应的,线程处理完消息之后,会等待下一个消息带来,线程会重复利用。Thread-Per-Message模式中,线程和消息一一对应,无法复用。

如下述的代码中,Handler类实例在接收到消息的时候,会将消息缓存起来。同时,在Handler类中开启了一定数量的线程,线程会循环的从缓存中读取消息,进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Handler {

public static final int THREAD_COUNT = 100;

private LinkedList<Message> list;

WorkerThread[] threads = new WorkerThread[THREAD_COUNT];

public Handler() {
for (int i = 0; i < THREAD_COUNT; i++) {
threads = new WorkerThread(handler);
}
list = new LinkedList<>();
}

public void handle(Message message) throws InterruptedException {
this.list.offerLast(message);
notifyAll();
}

public synchronized Message getMessage() throws InterruptedException {
while (list.isEmpty()) {
wait();
}
return list.pollFirst();
}
}

public class WorkerThread implements Runnable {
private Handler handler;
public WorkerThread(Handler handler) {
this.handler = handler;
}
public void run() {
for(;;) {
Message message = this.handler.getMessage();
// process the message
}
}
}

可以看到,WorkerThread的数量越多,消息的并发处理能力越强。但是我们知道,操作系统中创建线程是存在开销的,增加线程的个数,就会带来更多的系统占用。同时,WorkerThread线程的数量超过了同时需要处理消息的数量,多余的WorkerThread也没有什么意义。

关于对Worker Thread模式中的WorkerThread,试着去思考如下的问题:

  1. WorkerThread线程的个数,需要创建多少个?
  2. WorkerThread是在系统启动的时候就进行创建,还是在消息到来的时候再进行创建?
  3. WorkerThread线程创建之后,就一直存活吗?能否在适当的时候,关闭某些空闲的线程?
  4. WorkerThread线程可否随着消息的增多,而保持增多?随着消息的减少,而保持减少?

对于Handler类中的消息缓存队列,也请去思考如下问题:

  1. 缓存队列的长度是无限的吗?持续向队列中提交消息,会发生什么?
  2. 假如缓存队列长度是固定且有限的,那超过了队列缓存长度的消息,应该怎么处理?

以上的思考,在jdk中的ThreadPoolExecutor中都有答案,请参阅学习。

Future模式

前面在Thread-Per-Message模式中说到了,在此模式中无法获取到线程的执行结果。如想要获取到执行结果,则可以考虑使用Future模式。

Future模式,即在线程执行之后,不进行等待,而是先返回给调用者Future对象,调用者后续可以在合适的时候使用此Future对象来获取线程执行的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public Future handle(final Message message) {
final Future future = new Future();
new Thread() {
public void run() {
// process message
future.setData("Result");
}
}.start();
return future;
}

public class Future {
private String data;
public Future() {

}

public void setData(String data) {
this.data = data;
}

public String get() {
return this.data;
}
}

如上述代码所示,在讲消息提交到handle函数之后,handle函数返回了一个future对象并且返回。在线程执行完成消息处理之后,会将线程执行的结果,通过调用future的setData方法,附加到future中。如想要获取到线程的执行结果,则可通过如下方式调用

1
2
3
4
while (future.get() == null) {
wait();
}
String data = future.get();

在上述的示例代码中,只做了简单示范,future的get函数没有进行阻塞处理,即当线程没有执行完成之后,get操作也直接返回。因此,在客户端获取线程结果的调用中,需要使用Guarded Suspension模式去循环执行结果。也可以选择在get函数中进行阻塞,同步线程的处理结果,客户端调用则无需使用while循环处理。

在jdk的juc包中,提供了Future模式的一系列相关的类和接口,如Callable、Future、FutureTask等,请参阅相关资料学习。

ThreadLocal模式

ThreadLocal模式,可以直译为线程本地存储模式。每一个线程都持有一份数据,数据在各个线程之间不进行共享,自然做到了数据的互不影响。

jdk中的ThreadLocal类即是此模式的有效利用。在ThreadLocal类中,定义了set方法和get方法。set用于向线程中添加关联数据。调用get方法,可以获取到与当前线程关联的数据。

ThreadLocal的实现,保证了在多个线程都持有ThreadLocal的实例的时候,在各个线程中调用实例的get方法,获取到的数据都是线程独有的。

1
2
public void set(T value);
public T get();

如下述示例代码。ThreadLocal中存储了线程的名称,在执行append方法的时候,通过调用thread.get()将会获取到当前执行append操作的线程的名称。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class LogAppender {

private ThreadLocal<String> threadLocal = new ThreadLocal<>();

public void append(PrintStream stream) {
stream.println(threadLocal.get() + " print out");
}

public void setThreadName(String name) {
this.threadLocal.set(name);
}
}

public class LogThread extends Thread {
private LogAppender appender;
private int index;
private boolean initialized = false;

LogThread(LogAppender appender, int index) {
this.appender = appender;
this.index = index;

}

public void run() {
if(!this.initialized) {
this.initialized = true;
this.setName("Thread-" + this.index);
this.appender.setThreadName(this.getName());
}

this.appender.append(System.out);
}
}

Two-Phase Termination模式

Two-Phase Termination模式,即两阶段终止模式。不同于上面的那些模式,是关于如何创建线程来运行,Two-Phase Termination模式是关于如何优雅的终止线程的运行。确切一些的说,是关于如何在A线程中优雅的关闭B线程,不会让B线程突然暴毙,而是可以让B线程在关闭之前可以执行某些操作。

Thread类中的stop方法,可以用于关闭线程,但现在已经废弃,原因即是调用stop来关闭线程,只能让线程突然暴毙,但无法做到优雅。同时,处于sleep状态的线程,无法即时响应stop方法,会降低系统响应性。

Two-Phase Termination模式,顾名思义,分为两个阶段来进行。第一阶段是向想要关闭的线程发布关闭通知,第二阶段是线程响应通知执行关闭操作。

第一阶段,发布通知,可以概括为设置标志位和调用interrupt方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class WorkerThread extends Thread {
private volatile boolean shouldTerminate = false;

public void setTerminate() {
this.shouldTerminate = true;
interrupt();
}

public boolean getShouldTerminate() {
return this.shouldTerminate;
}

public void run() {
try {
while (!getShouldTerminate()) {
doWork();
}
} catch (InterruptedException ex) {
Thread.currentThread.interrupt();
} finally {
doShutdown();
}
}

private void doShutDown() {
// process shut down
}

private void doWork() {
// do work
}
}

需要注意的是上面interrupt方法的调用,处于sleep状态的线程将会转向runnable状态,从而可以响应操作。处于wait状态的线程,如果只进行了shouldTerminate标志位的设置,而不调用interrupt方法的话,线程也不会中等待队列中出来,所以也必须调用interrupt方法对线程下来“中断wait”的指示。

在“极客时间-java并发编程实战”中讲到:两阶段终止模式是一种应用很广泛的并发设计模式,在java预研中使用两阶段终止模式来优雅地终止线程,需要注意两个关键点:一个是仅检查终止标志是不够的,因为线程的状态可能处于休眠态;另一个是仅检查线程的中断状态也是不够的,因为我们依赖的第三方类库很可能没有正确处理中断异常“。

Active Object模式

Active Object模式,即主动对象模式。在此处,主动对象时指本身具有线程的对象。主动对象模式可将调用者任务提交和任务执行分离,任务的执行在Active Object对象的线程中执行。Active Object可以从外部接收和处理异步任务,并且在合适的时机可以将任务处理的结果返回给调用者。

调用者在进行任务提交的时候,Active Object会将任务封装成统一的对象。如下述示例代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
interface ActiveObject {
Future<String> process();
}

public class ActiveObjectImpl implements ActiveObject {

private final ExecutorService service = Executors.newSingleThreadExecutor();

public Future<String> process() {
Future<String> future = service.submit(() -> "result...");
return future;
}
}



👨‍💻本站使用 Stellar 主题创建

📃本"页面"访问 次 | 👀总访问 次 | 🥷总访客

⏱️本站已运行 小时