Queue(四)

并发Queue

在并发队列上,JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue。

ConcurrentLinkedQueue

是一种适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue。它是一个基于链表的无界线程安全的队列。该队列的元素遵循先进先出的原则,头是最先进入的,尾是最近加入的,该队列不允许Null元素。ConcurrentLinkedQueue重要方法:

  • add()和offer()都是加入元素的方法(在ConcurrentLinkedQueue中,这两个方法没有任何区别)。
  • poll()和peek()都是取头元素节点,区别在于前者会删除元素,后者不会。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class UserConcurrentLinkedQueue {

public static void main(String[] args) {
Queue<String> queue = new ConcurrentLinkedQueue<>();
queue.add("a");
queue.offer("b");
queue.offer("c");
queue.offer("d");

System.out.println(queue.poll()); //a 从头部取出元素,并从队列中删除
System.out.println(queue.size()); //3
System.out.println(queue.peek()); //b
System.out.println(queue.size()); //3
}

}

BlockingQueue接口

  • ArrayBlockingQueue:
    基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,其内部实现读写分离,也就意味着生产和消费不能完全并行,长度是需要定义的,可以指定先进先出或者先进后出,也叫有界队列,在很多场景非常适合使用。
  • LinkedBlockingQueue:
    基于链表的阻塞实现,同ArrayBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),LinkedBlockingQueue之所以能够高效的处理并发数据,是因为其内部采用分离锁(读写分离两个锁),从而实现生产者和消费者操作的完全并行运行。它是一个无界队列。

  • SynchronousQueue:
    一种没有缓冲的队列,生产者的数据直接会被消费者获取并消费掉。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class UseQueue {

public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
queue.offer("a");
queue.put("b");;
queue.add("c");
queue.offer("d");
queue.offer("f");
System.out.println(queue.offer("g", 2, TimeUnit.SECONDS));
}

}
/*
* 打印结果:
* false
* */
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class UseQueue {

public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); //不传参数就是无界队列
queue.add("a");
queue.add("b");
queue.offer("c");
queue.put("d");
queue.add("f");

for(Iterator<String> iterator = queue.iterator(); iterator.hasNext();){
String str = iterator.next();
System.out.println(str);
}
}

}
/*
* 打印结果:
* a
* b
* c
* d
* f
* */
  • PriorityBlockingQueue:
    基于优先级的阻塞队列(优先级的判断通过构造函数传入的Comparable对象决定,也就是传入队列的对象必须实现Comparable接口),在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁,他也是一个无界的队列。
  • DelayQueue:
    带有延迟时间的Queue,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取该元素。DelayQueue中的元素必须实现DelayQueue接口,DelayQueue是一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除、任务超时处理、空闲连接的关闭等等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class UsePriorityBlockingQueue {

public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<Task> task = new PriorityBlockingQueue<>();
//在调用add方法时不排序
task.add(new Task(5,"t1"));
task.add(new Task(1,"t2"));
task.add(new Task(3,"t3"));

//调用take方法时排序
System.out.println(task);
System.out.println(task.take());
System.out.println(task);
System.out.println(task.take());
System.out.println(task);
System.out.println(task.take());
}

}
/**
打印结果
[5 , t1, 1 , t2, 3 , t3]
5 , t1
[3 , t3, 1 , t2]
3 , t3
[1 , t2]
1 , t2
* */
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Task implements Comparable<Task>{

private int id;

private String name;

public Task(int id,String name){
this.id = id;
this.name= name;
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

@Override
public String toString() {
return this.getId()+" , "+this.getName();
}

@Override
public int compareTo(Task paramT) {
return paramT.getId() > this.getId() ? 1:(paramT.getId() < this.getId() ? -1 : 0);
}
}

BlockingQueue接口的重要方法

  • offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false(本方法不阻塞当前执行方法的线程)。
  • offer(E o,long timeout,TimeUnit unit):可以设定等待时间,如果在指定的时间内,还不能往队列中加入元素,则返回失败。
  • put(anObject):把anObject加到BlockingQueue里,如果BlockingQueue没有空间,则调用此方法的线程被阻塞直到BlockingQueue里面有空间再继续。
  • poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,仍取不到时返回null。
  • poll(long timeout,TimeUnit unit):从BlockingQueue取出一个对首对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败。
  • take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻塞进入等待状态直到BlockingQueue有新的元素加入。
  • drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可提升获取数据的效率,不需要分批加锁或释放锁。

Deque双端队列

Deque允许在队列的头部和尾部进行出兑和入队操作。LinkedBlockingDeque是一个线程安全的双端队列,可以说他是最为复杂的一种队列,在内部实现维护了前端和后端节点,但是其没有实现读写分离,因此同一时间只能有一个线程对其操作。在高并发中性能远低于其他BlockingQueue。更低于ConcurrentLinkedQueue,在JDK早期有一个非线程安全的Deque就是ArrayDeque了,java6里添加了LinkedBlockingDeque来弥补多线程场景下线程安全的问题。

-------------本文结束感谢您的阅读-------------