SynchronousQueue的使用

SynchronousQueue的使用https://blog.csdn.net/cn_yaojin/article/details/80852395原文地址:https://blog.csdn.net/zmx729618/article/details/52980158SynchronousQueue是这样一种阻塞队列,其中每个put必须等待一个take,反之亦然。同步队列没有任何内部容量,甚至连一个队…

大家好,又见面了,我是你们的朋友全栈君。

https://blog.csdn.net/cn_yaojin/article/details/80852395

原文地址:https://blog.csdn.net/zmx729618/article/details/52980158

 

     SynchronousQueue是这样一种阻塞队列,其中每个 put 必须等待一个 take,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。 
        不能在同步队列上进行 peek,因为仅在试图要取得元素时,该元素才存在; 
        除非另一个线程试图移除某个元素,否则也不能(使用任何方法)添加元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的头是尝试添加到队列中的首个已排队线程元素; 如果没有已排队线程,则不添加元素并且头为 null。 
        对于其他 Collection 方法(例如 contains),SynchronousQueue 作为一个空集合。此队列不允许 null 元素。
        它非常适合于传递性设计,在这种设计中,在一个线程中运行的对象要将某些信息、事件或任务传递给在另一个线程中运行的对象,它就必须与该对象同步。 
        对于正在等待的生产者和使用者线程而言,此类支持可选的公平排序策略。默认情况下不保证这种排序。但是,使用公平设置为 true 所构造的队列可保证线程以 FIFO 的顺序进行访问。 公平通常会降低吞吐量,但是可以减小可变性并避免得不到服务。 
        注意1:它一种阻塞队列,其中每个 put 必须等待一个 take,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。 
        注意2:它是线程安全的,是阻塞的。 
        注意3:不允许使用 null 元素。 
        注意4:公平排序策略是指调用put的线程之间,或take的线程之间。公平排序策略可以查考ArrayBlockingQueue中的公平策略。 
        注意5:SynchronousQueue的以下方法: 
        * iterator() 永远返回空,因为里面没东西。 
        * peek() 永远返回null。 
        * put() 往queue放进去一个element以后就一直wait直到有其他thread进来把这个element取走。 
        * offer() 往queue里放一个element后立即返回,如果碰巧这个element被另一个thread取走了,offer方法返回true,认为offer成功;否则返回false。 
        * offer(2000, TimeUnit.SECONDS) 往queue里放一个element但是等待指定的时间后才返回,返回的逻辑和offer()方法一样。 
        * take() 取出并且remove掉queue里的element(认为是在queue里的。。。),取不到东西他会一直等。 
        * poll() 取出并且remove掉queue里的element(认为是在queue里的。。。),只有到碰巧另外一个线程正在往queue里offer数据或者put数据的时候,该方法才会取到东西。否则立即返回null。 
        * poll(2000, TimeUnit.SECONDS) 等待指定的时间然后取出并且remove掉queue里的element,其实就是再等其他的thread来往里塞。 
        * isEmpty()永远是true。 
        * remainingCapacity() 永远是0。 
        * remove()和removeAll() 永远是false。 

        这是一个很有意思的阻塞队列,其中每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。因此此队列内部其 实没有任何一个元素,或者说容量是0,严格说并不是一种容器。由于队列没有容量,因此不能调用peek操作,因为只有移除元素时才有元素。

        一个没有容量的并发队列有什么用了?或者说存在的意义是什么?SynchronousQueue 的实现非常复杂,SynchronousQueue 内部没有容量,但是由于一个插入操作总是对应一个移除操作,反过来同样需要满足。那么一个元素就不会再SynchronousQueue 里面长时间停留,一旦有了插入线程和移除线程,元素很快就从插入线程移交给移除线程。也就是说这更像是一种信道(管道),资源从一个方向快速传递到另一方 向。需要特别说明的是,尽管元素在SynchronousQueue 内部不会“停留”,但是并不意味之SynchronousQueue 内部没有队列。实际上SynchronousQueue 维护者线程队列,也就是插入线程或者移除线程在不同时存在的时候就会有线程队列。既然有队列,同样就有公平性和非公平性特性,公平性保证正在等待的插入线 程或者移除线程以FIFO的顺序传递资源。显然这是一种快速传递元素的方式,也就是说在这种情况下元素总是以最快的方式从插入着(生产者)传递给移除着(消费者),这在多任务队列中是最快处理任务的方式。在线程池的相关章节中还会更多的提到此特性。

       它模拟的功能类似于生活中一手交钱一手交货这种情形,像那种货到付款或者先付款后发货模型不适合使用SynchronousQueue。首先要知道SynchronousQueue没有容纳元素的能力,即它的isEmpty()方法总是返回true,但是给人的感觉却像是只能容纳一个元素。

 

 
  1. package rpc_netty.synchronous;

  2.  
  3. import java.util.Random;

  4. import java.util.concurrent.SynchronousQueue;

  5. import java.util.concurrent.TimeUnit;

  6.  
  7. public class SynchronousQueueDemo {

  8.  
  9. public static void main(String[] args) {

  10. SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();

  11. Demo2 demo2 = new Demo2(queue);

  12. Demo1 demo1 = new Demo1(queue);

  13. Thread t1= new Thread(demo2);

  14. Thread t2= new Thread(demo1);

  15. t1.start();

  16. t2.start();

  17. }

  18.  
  19. }

  20. /**

  21. * 模拟生产者

  22. * @author Administrator

  23. *

  24. */

  25. class Demo1 implements Runnable{

  26. SynchronousQueue<Integer> queue = null;

  27.  
  28. public Demo1(){

  29.  
  30. }

  31.  
  32. public Demo1(SynchronousQueue<Integer> queue){

  33. this.queue = queue;

  34. }

  35.  
  36. @Override

  37. public void run() {

  38. int rand = new Random().nextInt(1000);

  39. System.out.println(String.format("模拟生产者:%d",rand));

  40. try{

  41. TimeUnit.SECONDS.sleep(3);

  42. queue.put(rand);

  43. }catch(Exception e){

  44. e.printStackTrace();

  45. }

  46. System.out.println(queue.isEmpty());

  47. }

  48.  
  49. }

  50. /**

  51. * 模拟消费者

  52. * @author Administrator

  53. *

  54. */

  55. class Demo2 implements Runnable{

  56. SynchronousQueue<Integer> queue = null;

  57.  
  58. public Demo2(){

  59.  
  60. }

  61.  
  62. public Demo2(SynchronousQueue<Integer> queue){

  63. this.queue = queue;

  64. }

  65.  
  66. @Override

  67. public void run() {

  68. System.out.println("消费者已经准备好接受元素了...");

  69. try{

  70. System.out.println(String.format("消费一个元素:%d", queue.take()));

  71. }catch(Exception e){

  72. e.printStackTrace();

  73. }

  74. System.out.println("------------------------------------------");

  75. }

  76.  
  77. }

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/152131.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号