當前位置:名人名言大全網 - 短信平臺 - java阻塞隊列 線程同步合作

java阻塞隊列 線程同步合作

 Queue接口與List Set同壹級別 都是繼承了Collection接口 LinkedList實現了Queue接口 Queue接口窄化了對LinkedList的方法的訪問權限(即在方法中的參數類型如果是Queue時 就完全只能訪問Queue接口所定義的方法了 而不能直接訪問 LinkedList的非Queue的方法) 以使得只有恰當的方法才可以使用 BlockingQueue 繼承了Queue接口

 隊列是壹種數據結構.它有兩個基本操作 在隊列尾部加人壹個元素 和從隊列頭部移除壹個元素就是說 隊列以壹種先進先出的方式管理數據 如果妳試圖向壹個已經滿了的阻塞隊列中添加壹個元素或者是從壹個空的阻塞隊列中移除壹個元索 將導致線程阻塞.在多線程進行合作時 阻塞隊列是很有用的工具 工作者線程可以定期地把中間結果存到阻塞隊列中而其他工作者線線程把中間結果取出並在將來修改它們 隊列會自動平衡負載 如果第壹個線程集運行得比第二個慢 則第二個線程集在等待結果時就會阻塞 如果第壹個線程集運行得快 那麽它將等待第二個線程集趕上來 下表顯示了jdk 中的阻塞隊列的操作

 add 增加壹個元索 如果隊列已滿 則拋出壹個IIIegaISlabEepeplian異常

 remove? 移除並返回隊列頭部的元素 如果隊列為空 則拋出壹個NoSuchElementException異常

 element? 返回隊列頭部的元素 如果隊列為空 則拋出壹個NoSuchElementException異常

 offer? 添加壹個元素並返回true? 如果隊列已滿 則返回false

 poll 移除並返問隊列頭部的元素 如果隊列為空 則返回null

 peek? 返回隊列頭部的元素 如果隊列為空 則返回null

 put 添加壹個元素? 如果隊列滿 則阻塞

 take 移除並返回隊列頭部的元素 如果隊列為空 則阻塞

 remove element offer poll peek 其實是屬於Queue接口

 阻塞隊列的操作可以根據它們的響應方式分為以下三類 aad removee和element操作在妳試圖為壹個已滿的隊列增加元素或從空隊列取得元素時拋出異常 當然 在多線程程序中 隊列在任何時間都可能變成滿的或空的 所以妳可能想使用offer poll peek方法 這些方法在無法完成任務時只是給出壹個出錯示而不會拋出異常

 註意 poll和peek方法出錯進返回null 因此 向隊列中插入null值是不合法的

 還有帶超時的offer和poll方法變種 例如 下面的調用

 boolean success = q offer(x TimeUnit MILLISECONDS);

 嘗試在 毫秒內向隊列尾部插入壹個元素 如果成功 立即返回true 否則 當到達超時進 返回false 同樣地 調用

 Object head = q poll( TimeUnit MILLISECONDS);

 如果在 毫秒內成功地移除了隊列頭元素 則立即返回頭元素 否則在到達超時時 返回null

 最後 我們有阻塞操作put和take put方法在隊列滿時阻塞 take方法在隊列空時阻塞

 ncurrent包提供了阻塞隊列的 個變種 默認情況下 LinkedBlockingQueue的容量是沒有上限的(說的不準確 在不指定時容量為Integer MAX_VALUE 不要然的話在put時怎麽會受阻呢) 但是也可以選擇指定其最大容量 它是基於鏈表的隊列 此隊列按 FIFO(先進先出)排序元素

 ArrayBlockingQueue在構造時需要指定容量 並可以選擇是否需要公平性 如果公平參數被設置true 等待時間最長的線程會優先得到處理(其實就是通過將ReentrantLock設置為true來達到這種公平性的 即等待時間最長的線程會先操作) 通常 公平性會使妳在性能上付出代價 只有在的確非常需要的時候再使用它 它是基於數組的阻塞循環隊列 此隊列按 FIFO(先進先出)原則對元素進行排序

 PriorityBlockingQueue是壹個帶優先級的隊列 而不是先進先出隊列 元素按優先級順序被移除 該隊列也沒有上限(看了壹下源碼 PriorityBlockingQueue是對PriorityQueue的再次包裝 是基於堆數據結構的 而PriorityQueue是沒有容量限制的 與ArrayList壹樣 所以在優先阻塞隊列上put時是不會受阻的 雖然此隊列邏輯上是無界的 但是由於資源被耗盡 所以試圖執行添加操作可能會導致 OutOfMemoryError) 但是如果隊列為空 那麽取元素的操作take就會阻塞 所以它的檢索操作take是受阻的 另外 往入該隊列中的元素要具有比較能力

 最後 DelayQueue(基於PriorityQueue來實現的)是壹個存放Delayed 元素的無界阻塞隊列 只有在延遲期滿時才能從中提取元素 該隊列的頭部是延遲期滿後保存時間最長的 Delayed 元素 如果延遲都還沒有期滿 則隊列沒有頭部 並且poll將返回null 當壹個元素的 getDelay(TimeUnit NANOSECONDS) 方法返回壹個小於或等於零的值時 則出現期滿 poll就以移除這個元素了 此隊列不允許使用 null 元素 下面是延遲接口

 Java代碼

 public interface Delayed extends Comparable<Delayed> {

 long getDelay(TimeUnit unit);

 }

 public interface Delayed extends Comparable<Delayed> {

 long getDelay(TimeUnit unit);

 }

 放入DelayQueue的元素還將要實現pareTo方法 DelayQueue使用這個來為元素排序

 下面的實例展示了如何使用阻塞隊列來控制線程集 程序在壹個目錄及它的所有子目錄下搜索所有文件 打印出包含指定關鍵字的文件列表 從下面實例可以看出 使用阻塞隊列兩個顯著的好處就是 多線程操作***同的隊列時不需要額外的同步 另外就是隊列會自動平衡負載 即那邊(生產與消費兩邊)處理快了就會被阻塞掉 從而減少兩邊的處理速度差距 下面是具體實現

 Java代碼

 public class BlockingQueueTest {

 public static void main(String[] args) {

 Scanner in = new Scanner(System in);

 System out print( Enter base directory (e g /usr/local/jdk /src): );

 String directory = in nextLine();

 System out print( Enter keyword (e g volatile): );

 String keyword = in nextLine();

 final int FILE_QUEUE_SIZE = ;// 阻塞隊列大小

 final int SEARCH_THREADS = ;// 關鍵字搜索線程個數

 // 基於ArrayBlockingQueue的阻塞隊列

 BlockingQueue<File> queue = new ArrayBlockingQueue<File>(

 FILE_QUEUE_SIZE);

 //只啟動壹個線程來搜索目錄

 FileEnumerationTask enumerator = new FileEnumerationTask(queue

 new File(directory));

 new Thread(enumerator) start();

 //啟動 個線程用來在文件中搜索指定的關鍵字

 for (int i = ; i <= SEARCH_THREADS; i++)

 new Thread(new SearchTask(queue keyword)) start();

 }

 }

 class FileEnumerationTask implements Runnable {

 //啞元文件對象 放在阻塞隊列最後 用來標示文件已被遍歷完

 public static File DUMMY = new File( );

 private BlockingQueue<File> queue;

 private File startingDirectory;

 public FileEnumerationTask(BlockingQueue<File> queue File startingDirectory) {

 this queue = queue;

 this startingDirectory = startingDirectory;

 }

 public void run() {

 try {

 enumerate(startingDirectory);

 queue put(DUMMY);//執行到這裏說明指定的目錄下文件已被遍歷完

 } catch (InterruptedException e) {

 }

 }

 // 將指定目錄下的所有文件以File對象的形式放入阻塞隊列中

 public void enumerate(File directory) throws InterruptedException {

 File[] files = directory listFiles();

 for (File file : files) {

 if (file isDirectory())

 enumerate(file);

 else

 //將元素放入隊尾 如果隊列滿 則阻塞

 queue put(file);

 }

 }

 }

 class SearchTask implements Runnable {

 private BlockingQueue<File> queue;

 private String keyword;

 public SearchTask(BlockingQueue<File> queue String keyword) {

 this queue = queue;

 this keyword = keyword;

 }

 public void run() {

 try {

 boolean done = false;

 while (!done) {

 //取出隊首元素 如果隊列為空 則阻塞

 File file = queue take();

 if (file == FileEnumerationTask DUMMY) {

 //取出來後重新放入 好讓其他線程讀到它時也很快的結束

 queue put(file);

 done = true;

 } else

 search(file);

 }

 } catch (IOException e) {

 e printStackTrace();

 } catch (InterruptedException e) {

 }

 }

 public void search(File file) throws IOException {

 Scanner in = new Scanner(new FileInputStream(file));

 int lineNumber = ;

 while (in hasNextLine()) {

 lineNumber++;

 String line = in nextLine();

 if (ntains(keyword))

 System out printf( %s:%d:%s%n file getPath() lineNumber

 line);

 }

 in close();

 }

lishixinzhi/Article/program/Java/hx/201311/26657