并发编程(四)------并发quene

2018-12-27 07:42:22来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue接口!

ConcurrentLinkedQueue:

  是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue。它是一个基于链接节点的无界线程安全队列。

  该队列的元素遵循先进先出的原则,头是最先加入的,尾是最近加入的,该队列不允许null元素。

ConcurrentLinkedQueue重要方法:
add() 和 offer() 都是加入元素的方法 (在ConcurrentLinkedQueue中,这俩个方法没有任何区别)
poll() 和 peek() 都是取头元素节点,区别在于前者会删除元素,后者不会。

BlockingQueue:

BlockingQueue接口的重要方法

offer(anObject): 表示如果可能的话, 将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳, 则返回true, 否则返回false.(本方法不阻塞当前执行方法的线程)

offer(E o, long timeout, TimeUnit unit), 可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。

put(anObject): 把anObject加到BlockingQueue里, 如果BlockQueue没有空间, 则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.

poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。

take(): 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;

drainTo(): 一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

MyQueueTest

 1 package com.bfxy.thread.cord.collection;
 2 
 3 public class MyQueueTest {
 4     
 5     public static void main(String[] args) throws InterruptedException {
 6         MyQueue mq = new MyQueue(5);
 7         mq.put("a");
 8         mq.put("b");
 9         mq.put("c");
10         mq.put("d");
11         mq.put("e");
12         
13         System.err.println("当前元素的个数"+mq.size());
14         
15         Thread t1 = new Thread(new Runnable() {
16             
17             @Override
18             public void run() {
19                 mq.put("f");
20                 mq.put("g");
21                 
22             }
23         },"t1");
24         
25         Thread.sleep(1000);
26         
27         Thread t2 = new Thread(new Runnable() {
28             
29             @Override
30             public void run() {
31                 try {
32                     Thread.sleep(1000);
33                     Object o1=mq.take();
34                     Thread.sleep(1000);
35                     Object o2=mq.take();
36                 } catch (InterruptedException e) {
37                     // TODO Auto-generated catch block
38                     e.printStackTrace();
39                 }
40                     
41             }
42         },"t2");
43         
44         t1.start();
45         Thread.sleep(1000);
46         t2.start();
47         Thread.sleep(5000);
48         
49         System.err.println(mq.getQueueList().toString());
50     }
51     
52 
53 }

MyQueue

 1 package com.bfxy.thread.cord.collection;
 2 
 3 import java.util.LinkedList;
 4 import java.util.List;
 5 import java.util.concurrent.atomic.AtomicInteger;
 6 
 7 public class MyQueue {
 8     //1 就是我们整个队列的容器
 9     private final LinkedList<Object> list =new LinkedList<>();
10     
11     //2计数器
12     private final AtomicInteger count = new AtomicInteger();
13     
14     private final int maxSize; //最大容量限制
15     
16     
17     private final int minSize = 0;
18     
19     private final Object lock = new Object(); //
20     
21     public  MyQueue(int maxSize) {
22         this.maxSize = maxSize;        
23     }
24     
25     public void put(Object obj) {
26         synchronized (lock) {
27             while (count.get() ==this.maxSize) {
28                 try {
29                     lock.wait();
30                 } catch (InterruptedException e) {
31                     // TODO Auto-generated catch block
32                     e.printStackTrace();
33                 }    
34             }
35             //添加新的元素进到容器里
36             list.add(obj);
37             count.getAndIncrement();//i++
38             System.err.println("元素"+obj+"已经添加容器中");
39             //进行唤醒可能正在等待的take方法操作中的线程
40             lock.notify();
41             
42         }
43         
44     }
45     
46     public Object take() {
47         Object temp= null;
48         synchronized (lock) {
49             while (count.get() ==this.minSize) {
50                 try {
51                     lock.wait();
52                 } catch (InterruptedException e) {
53                     // TODO Auto-generated catch block
54                     e.printStackTrace();
55                 }    
56             }
57             temp=list.removeFirst();
58             count.getAndDecrement();//i--
59             System.err.println("元素"+temp+"已经从容器中取走");
60             //进行唤醒可能正在等待的put方法操作中的线程
61             lock.notify();
62             
63         }
64         
65         return temp;
66     }
67     
68     public int size() {
69         return count.get();
70     }
71     
72     public List<Object> getQueueList() {
73         return list;
74     }
75     
76 
77 }

 

 

 

  

 

 

 

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:Java面试题总结(附答案)

下一篇:pdf文件下载水印添加的中文与空格问题解决