歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux編程 >> Linux編程 >> Java並發編程之阻塞隊列

Java並發編程之阻塞隊列

日期:2017/3/1 9:17:58   编辑:Linux編程

閱讀目錄

  • 1、什麼是阻塞隊列?
  • 2、主要的阻塞隊列及其方法
  • 3、阻塞隊列的實現原理
  • 4、阻塞隊列的應用:實現消費者-生產者模式
  • 5、參考資料

1、什麼是阻塞隊列?

  隊列是一種數據結構,它有兩個基本操作:在隊列尾部加入一個元素,從隊列頭部移除一個元素。阻塞隊裡與普通的隊列的區別在於,普通隊列不會對當前線程產生阻塞,在面對類似消費者-生產者模型時,就必須額外的實現同步策略以及線程間喚醒策略。使用阻塞隊列,就會對當前線程產生阻塞,當隊列是空時,從隊列中獲取元素的操作將會被阻塞,當隊列是滿時,往隊列裡添加元素的操作也會被阻塞。

2、主要的阻塞隊列及其方法

  java.util.concurrent包下提供主要的幾種阻塞隊列,主要有以下幾個:

  • ArrayBlockingQueue:基於數組實現的阻塞隊列,在創建ArrayBlockingQueue對象時必須指定其容量大小,還可以指定訪問策略,默認情況下為非公平的,即不保證等待時間最長的線程最優先能夠訪問隊列。
  • LinkedBlockingQueue:基於鏈表實現的一個阻塞隊列,在創建LinkedBlockingQueue對象時如果不指定容量大小,則默認大小為Integer.MAX_VALUE。
  • 以上2種隊列都是先進先出隊列,而PriorityBlockingQueue卻不是,它會按照元素的優先級對元素進行排序,按照優先級順序出隊,每次出隊的元素都是優先級最高的元素。注意,此阻塞隊列為無界阻塞隊列,即容量沒有上限(通過源碼就可以知道,它沒有容器滿的信號標志),前面2種都是有界隊列。
  • DelayQueue:基於PriorityQueue,一種延時阻塞隊列,DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue也是一個無界隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。

  阻塞隊列包括了非阻塞隊列中的大部分方法,還提供另外若干非常有用的方法:

  • put方法用來向隊尾存入元素,如果隊列滿,則等待;
  • take方法用來從隊首取元素,如果隊列為空,則等待;
  • offer方法用來向隊尾存入元素,如果隊列滿,則等待一定的時間,當時間期限達到時,如果還沒有插入成功,則返回false;否則返回true;
  • poll方法用來從隊首取元素,如果隊列空,則等待一定的時間,當時間期限達到時,如果取到,則返回null;否則返回取得的元素;

  下面看一段代碼:

import java.util.concurrent.ArrayBlockingQueue;

/** 
 * @author 作者:徐劍   E-mail:[email protected] 
 * @version 創建時間:2016年3月20日 下午12:52:53 
 * 類說明 
 */
public class BlockingQueue
{
    public static void main(String[] args) throws InterruptedException
    {
        java.util.concurrent.BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5);
        for (int i = 0; i < 10; i++)
        {
            // 將指定元素添加到此隊列中
            blockingQueue.put("加入元素" + i);
            System.out.println("向阻塞隊列中添加了元素:" + i);
        }
        System.out.println("程序到此運行結束,即將退出----");
    }
}

  當限制阻塞隊列數量為5時,添加了5個元素之後,繼續添加將會隊列外阻塞等待,此時程序並未終止。

  

  當隊列滿了之後,我們將隊首元素移除,則可以繼續向阻塞隊列中添加元素,代碼如下:

public class BlockingQueue
{
    public static void main(String[] args) throws InterruptedException
    {
        java.util.concurrent.BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(5);
        for (int i = 0; i < 10; i++)
        {
            // 將指定元素添加到此隊列中
            blockingQueue.put("加入元素" + i);
            System.out.println("向阻塞隊列中添加了元素:" + i);
            if(i>=4)
                System.out.println("移除隊首元素"+blockingQueue.take());
        }
        System.out.println("程序到此運行結束,即將退出----");
    }
}

  執行結果如下:

  

3、阻塞隊列的實現原理

  下面主要看一下ArrayBlockingQueue的實現原理。

  首先看一下ArrayBlockingQueue類的成員變量:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** 底層存儲結構-數組 */
    final Object[] items;

    /** 隊首元素下標 */
    int takeIndex;

    /** 隊尾元素下標 */
    int putIndex;

    /**隊列元素總數 */
    int count;
/** 重入鎖 */ final ReentrantLock lock; /** notEmpty等待條件 */ private final Condition notEmpty; /** notFull等待條件 */ private final Condition notFull; /** * Shared state for currently active iterators, or null if there * are known not to be any. Allows queue operations to update * iterator state. */ transient Itrs itrs = null;

  可以看到,ArrayBlockingQueue用來存儲元素的實際上是一個數組。

  再看下ArrayBlockingQueue兩個重要方法的實現,put()和take():

public void put(E e) throws InterruptedException
    {
        //先檢查e是否為空
        checkNotNull(e);
        //獲取鎖
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try
        {
            //當隊列已滿,進入條件等待
            while (count == items.length)
                notFull.await();
            //隊列不滿,進行入隊列操作
            enqueue(e);
        } 
        finally
        {
            //釋放鎖
            lock.unlock();
        }
    }

  再看下具體的入隊操作:

private void enqueue(E x)
    {
        final Object[] items = this.items;
        //隊尾入隊
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        //隊列總數+1
        count++;
        //notempty條件的等待集中隨機選擇一個線程,解除其阻塞狀態
        notEmpty.signal();
    }

  下面是take()方法的源代碼:

public E take() throws InterruptedException
    {
        //獲取鎖
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try
        {
            //隊列為空
            while (count == 0)
                //線程加入notEmpty條件等待集
                notEmpty.await();
            //非空,出隊列
            return dequeue();
        } finally
        {
            //釋放鎖
            lock.unlock();
        }
    }

4、阻塞隊列的應用:實現消費者-生產者模式

/** 
 * @author 作者:徐劍   E-mail:[email protected] 
 * @version 創建時間:2016年3月20日 下午2:21:55 
 * 類說明:阻塞隊列實現的消費者-生產者模式
 */
public class Test
{
    private int queueSize = 10;
    private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);
    public static void main(String[] args)
    {
        Test test = new Test();
        Producer producer = test.new Producer();
        Consumer consumer = test.new Consumer();
        producer.start();
        consumer.start();
    }
    class Consumer extends Thread
    {
        @Override
        public void run()
        {
            consume();
        }

        private void consume()
        {
            while (true)
            {
                try
                {
                    queue.take();
                    System.out.println("從隊列取走一個元素,隊列剩余" + queue.size() + "個元素");
                } catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
        }
    }
    class Producer extends Thread
    {
        @Override
        public void run()
        {
            produce();
        }
        private void produce()
        {
            while (true)
            {
                try
                {
                    queue.put(1);
                    System.out.println("向隊列取中插入一個元素,隊列剩余空間:"+ (queueSize - queue.size()));
                } catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
        }
    }
}

Copyright © Linux教程網 All Rights Reserved