天天看點

Java-Queue總結

1. ConcurrentLinkedQueue 

基礎連結清單同步隊列。 

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

//底層連結清單實作  隊列,先進先出
public class Test_03_ConcurrentLinkedQueue {
  public static void main(String[] args) {
    Queue<String> queue = new ConcurrentLinkedQueue<>();
    for (int i = 0; i < 10; i++) {
      queue.offer("value" + i);
    }
    System.out.println(queue);
    System.out.println(queue.size());
    
    //peek()  檢視queue中的首資料
    System.out.println(queue.peek());
    System.out.println(queue.size());
    
    //poll()->擷取queue首資料
    System.out.println(queue.poll());
    System.out.println(queue.size());
  }
}      

   LinkedBlockingQueue 

阻塞隊列,隊列容量不足自動阻塞,隊列容量為 0 自動阻塞

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/** 并發容器 - LinkedBlockingQueue
*  阻塞容器。
*  put & take - 自動阻塞。
*  put自動阻塞, 隊列容量滿後,自動阻塞
*  take自動阻塞方法, 隊列容量為0後,自動阻塞。
*/
public class Test_04_LinkBlockingQueue {
  final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
  final Random r = new Random();
  
  public static void main(String[] args) {
    final Test_04_LinkBlockingQueue t = new Test_04_LinkBlockingQueue();
    
    new Thread(new Runnable() {
      @Override
      public void run() {
        while(true){
          try {
            t.queue.put("value"+t.r.nextInt(1000));
            TimeUnit.SECONDS.sleep(1);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    }, "producer").start();
    
    for(int i = 0; i < 3; i++){
      new Thread(new Runnable() {
        @Override
        public void run() {
          while(true){
            try {
              System.out.println(Thread.currentThread().getName() + 
                  " - " + t.queue.take());
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }
        }
      }, "consumer"+i).start();
    }
  }
}      

   ArrayBlockingQueue 

底層數組實作的有界隊列。自動阻塞。根據調用 API(add/put/offer)不同,有不同特 性。 當容量不足的時候,有阻塞能力。 add 方法在容量不足的時候,抛出異常。 put 方法在容量不足的時候,阻塞等待。 offer 方法, 單參數 offer 方法,不阻塞。容量不足的時候,傳回 false。目前新增資料操作放棄。 三參數 offer 方法(offer(value,times,timeunit)),容量不足的時候,阻塞 times 時長(單 位為 timeunit),如果在阻塞時長内,有容量空閑,新增資料傳回 true。如果阻塞時長範圍 内,無容量空閑,放棄新增資料,傳回 false

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class Test_05_ArrayBlockingQueue {
  //當容量不足的時候,有阻塞能力
  //add 方法在容量不足的時候,抛出異常
  //put 方法在容量不如的時候,阻塞等待
  //offer 方法,單參數,容量不足的時候,傳回false。目前新增資料操作放棄
  //offer 三參數,容量不足的時候,阻塞times時長(機關為timeunit),如果在阻塞時長内,
  //有容量空閑,新增資料傳回true,如果阻塞時長範圍内,無容量空閑,放棄新增資料,傳回false
  
  final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);

  public static void main(String[] args) {
    final Test_05_ArrayBlockingQueue t = new Test_05_ArrayBlockingQueue();

    for (int i = 0; i < 5; i++) {
      // System.out.println("add method : " + t.queue.add("value"+i));
      /*
       * try { t.queue.put("put"+i); } catch (InterruptedException e) {
       * e.printStackTrace(); } System.out.println("put method : " + i);
       */
      // System.out.println("offer method : " + t.queue.offer("value"+i));
      try {
        System.out.println("offer method : " + t.queue.offer("value" + i, 1, TimeUnit.SECONDS));
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    System.out.println(t.queue);
  }
}      

  DelayQueue

延時隊列。根據比較機制,實作自定義處理順序的隊列。常用于定時任務。 如:定時關機

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

//有順序有時間的排序
//延時隊列,根據比較機制,實作自定義處理順序的隊列。常用于定時任務
public class Test_06_DelayQueue {
  static BlockingQueue<MyTask_06> queue = new DelayQueue<>();

  public static void main(String[] args) throws InterruptedException {
    long value = System.currentTimeMillis();
    MyTask_06 task1 = new MyTask_06(value + 2000);
    MyTask_06 task2 = new MyTask_06(value + 1000);
    MyTask_06 task3 = new MyTask_06(value + 3000);
    MyTask_06 task4 = new MyTask_06(value + 2500);
    MyTask_06 task5 = new MyTask_06(value + 1500);

    queue.put(task1);
    queue.put(task2);
    queue.put(task3);
    queue.put(task4);
    queue.put(task5);

    System.out.println(queue);
    System.out.println(value);
    for (int i = 0; i < 5; i++) {
      System.out.println(queue.take());
    }
  }
}

class MyTask_06 implements Delayed {

  private long compareValue;

  public MyTask_06(long compareValue) {
    this.compareValue = compareValue;
  }

  /**
   * 比較大小。自動實作升序 建議和getDelay方法配合完成。
   * 如果在DelayQueue是需要按時間完成的計劃任務,必須配合getDelay方法完成。
   */
  @Override
  public int compareTo(Delayed o) {
    return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
  }

  /**
   * 擷取計劃時長的方法。 根據參數TimeUnit來決定,如何傳回結果值。
   */
  @Override
  public long getDelay(TimeUnit unit) {
    return unit.convert(compareValue - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
  }

  @Override
  public String toString() {
    return "Task compare value is : " + this.compareValue;
  }
}      

   LinkedTransferQueue 

轉移隊列,使用 transfer 方法,實作資料的即時處理。沒有消費者,就阻塞

/**
 * 并發容器 - LinkedTransferQueue
 *  轉移隊列
 *  add - 隊列會儲存資料,不做阻塞等待。
 *  transfer - 是TransferQueue的特有方法。必須有消費者(take()方法的調用者)。
 *   如果沒有任意線程消費資料,transfer方法阻塞。一般用于處理即時消息。
 */
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;

public class Test_07_TransferQueue {
  TransferQueue<String> queue = new LinkedTransferQueue<>();

  public static void main(String[] args) {
    final Test_07_TransferQueue t = new Test_07_TransferQueue();

    /*
     * new Thread(new Runnable() {
     * 
     * @Override public void run() { try {
     * System.out.println(Thread.currentThread().getName() +
     * " thread begin " );
     * System.out.println(Thread.currentThread().getName() + " - " +
     * t.queue.take()); } catch (InterruptedException e) {
     * e.printStackTrace(); } } }, "output thread").start();
     * 
     * try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) {
     * e.printStackTrace(); }
     * 
     * try { t.queue.transfer("test string"); } catch (InterruptedException
     * e) { e.printStackTrace(); }
     */

    new Thread(new Runnable() {

      @Override
      public void run() {
        try {
          t.queue.transfer("test string");
          // t.queue.add("test string");
          System.out.println("add ok");
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }).start();

    try {
      TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    new Thread(new Runnable() {
      @Override
      public void run() {
        try {
          System.out.println(Thread.currentThread().getName() + " thread begin ");
          System.out.println(Thread.currentThread().getName() + " - " + t.queue.take());
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }, "output thread").start();
  }
}      

  SynchronusQueue 

同步隊列,是一個容量為 0 的隊列。是一個特殊的 TransferQueue。 必須現有消費線程等待,才能使用的隊列。 add 方法,無阻塞。若沒有消費線程阻塞等待資料,則抛出異常。 put 方法,有阻塞。若沒有消費線程阻塞等待資料,則阻塞

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class Test_08_SynchronusQueue {
  
  BlockingQueue<String> queue = new SynchronousQueue<>();
  
  public static void main(String[] args) {
    final Test_08_SynchronusQueue t = new Test_08_SynchronusQueue();
    
    new Thread(new Runnable() {
      @Override
      public void run() {
        try {
          System.out.println(Thread.currentThread().getName() + " thread begin " );
          try {
            TimeUnit.SECONDS.sleep(2);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          System.out.println(Thread.currentThread().getName() + " - " + t.queue.take());
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }, "output thread").start();
    
    /*try {
      TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }*/
    // t.queue.add("test add");
    try {
      t.queue.put("test put");
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    
    System.out.println(Thread.currentThread().getName() + " queue size : " + t.queue.size());
  }

}