util.concurrent下的常用工具类(七)

CountDownLacth的使用

CountDownLacth经常用于监听某些初始化操作,等初始化执行完毕后,通知主线程继续工作(一个线程等待其他所有线程的通知后再执行)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class UseCountDownLatch {

public static void main(String[] args) {
//这里的初始化参数表示只要countDown()方法被线程调用两次,相应的调用await()方法的线程就能继续执行
final CountDownLatch countDown = new CountDownLatch(2);

Thread t1 = new Thread(new Runnable(){
@Override
public void run() {
try {
System.out.println("进入线程t1" + "等待其他线程处理完成...");
countDown.await(); //t1线程阻塞
System.out.println("t1线程继续执行...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}},"t1");

Thread t2 = new Thread(new Runnable(){
@Override
public void run() {
try {
System.out.println("t2线程执行初始化操作...");
Thread.sleep(4000);
System.out.println("t2线程初始化操作完成,通知t1线程继续执行...");
countDown.countDown(); //call一下t1线程
} catch (InterruptedException e) {
e.printStackTrace();
}
}},"t2");


Thread t3 = new Thread(new Runnable(){
@Override
public void run() {
try {
System.out.println("t3线程执行初始化操作...");
Thread.sleep(5000);
System.out.println("t3线程初始化操作完成,通知t1线程继续执行...");
countDown.countDown();//call一下t1线程
} catch (InterruptedException e) {
e.printStackTrace();
}
}},"t3");

t1.start();
t2.start();
t3.start();
}

}

/*
打印结果:
进入线程t1等待其他线程处理完成...
t3线程执行初始化操作...
t2线程执行初始化操作...
t2线程初始化操作完成,通知t1线程继续执行...
t3线程初始化操作完成,通知t1线程继续执行...
t1线程继续执行...
*/

CyclicBarrier的使用

  • 场景:
    假设每个线程代表一个运动员,当所有运动员都准备好之后才一起出发, 只要有一个运动员没有准备好,其他运动员都等待(所有线程在最后一次调用await()方法之前都做阻塞操作)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class UseCyclicBarrier {

static class Runner implements Runnable{
private CyclicBarrier barrier;
private String name;

public Runner(CyclicBarrier barrier, String name){
this.barrier = barrier;
this.name = name;
}

@Override
public void run() {
try {
Thread.sleep(1000 * (new Random()).nextInt(5));
System.out.println(this.name + "准备ok.");
this.barrier.await(); //当线程第3第三次调用await()时,表示所有线程都已准备好并可以执行
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(this.name + " Go!!!");
}
}

public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3); //3个Runner
ExecutorService executor = Executors.newFixedThreadPool(3); //3个固定现场的线程池

executor.execute(new Runner(barrier, "zhangsan"));
executor.execute(new Runner(barrier, "lisi"));
executor.execute(new Runner(barrier, "wangwu"));

executor.shutdown();
}
}
/*
打印结果:

wangwu准备ok.
zhangsan准备ok.
lisi准备ok.
wangwu Go!!!
lisi Go!!!
zhangsan Go!!!
*/

Callable和Future使用

Future模式,JDK给与我们一个实现的封装,使用简单。Future模式非常适合在处理跟耗时很长的业务逻辑时进行使用,可以有效的减少系统的响应时间,提高系统的吞吐量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class UseFuture implements Callable<String> {

private String query;

public UseFuture(String query) {
this.query = query;
}

//业务逻辑,可能是一个很长或很慢的业务操作
@Override
public String call() throws Exception {
Thread.sleep(5000);
return this.query + "处理完成...";
}

public static void main(String[] args) throws InterruptedException, ExecutionException {
String queryStr = "query";
//1.构造FutureTask,并且传入需要真正执行业务逻辑的类,这个类需要实现Callable接口
FutureTask<String> futureTask1 = new FutureTask<>(new UseFuture(queryStr));
FutureTask<String> futureTask2 = new FutureTask<>(new UseFuture(queryStr));
//2.创建一个固定线程池
ExecutorService executor = Executors.newFixedThreadPool(2);
//3.提交任务(这里提交任务后,UseFuture的call()方法才会执行)
//submit和execute的区别:1).submit可以传入实现Callable接口的实例对象
// 2).submit方法有返回值
Future<?> f1 = executor.submit(futureTask1);
Future<?> f2 = executor.submit(futureTask2);
System.out.println("请求完毕");

while(true){
if(f1.get() == null && f2.get() == null){ //f1.get()返回null表示当前任务执行完毕
System.out.println("线程执行完毕");
break;
}
}

System.out.println("获取到的数据: " + futureTask1.get() + "futureTask1和futureTask2同时打印"); //异步获取执行结果
System.out.println("获取到的数据: " + futureTask2.get() + "futureTask1和futureTask2同时打印"); //异步获取执行结果
executor.shutdown();
}

}

/*
打印结果:
请求完毕
线程执行完毕
获取到的数据: query处理完成...futureTask1和futureTask2同时打印
获取到的数据: query处理完成...futureTask1和futureTask2同时打印
*/

Semaphore使用

Semphore可以控制系统的流量。拿到信号量的线程可以进入,否则就等待。通过acquire()和release()获取和释放访问许可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class UseSemaphore {

public static void main(String[] args) {
//线程池
ExecutorService exec = Executors.newCachedThreadPool();
//只能有5个线程同时访问业务模块
final Semaphore semp = new Semaphore(5);
//模拟多个客户端访问业务模块
for(int i = 0; i < 20; i++){
final int NO = i;
Runnable run = new Runnable(){
@Override
public void run() {

try {
System.out.println("=========");
//获得许可
semp.acquire();
//模拟业务模块
System.out.println("Accessing: " + NO);
Thread.sleep(1000 * (new Random()).nextInt(5));
//访问完毕,释放
semp.release();
System.out.println("=========");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
};
exec.execute(run);
}
}
}
-------------本文结束感谢您的阅读-------------