歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux編程 >> Linux編程 >> Java並行編程–從並行任務集獲取反饋

Java並行編程–從並行任務集獲取反饋

日期:2017/3/1 10:25:30   编辑:Linux編程

在並行任務啟動後,強制性地從並行任務得到反饋。

假想有一個程序,可以發送批郵件,還使用了多線程機制。你想知道有多少郵件成功發送嗎?你想知道在實際發送過程期間,這個批處理工作的實時進展嗎?

要實現多線程的這種反饋,我們可以使用Callable接口。此接口的工作方式基本上與Runnable相同,但是執行方法(call())會返回一個值,該值反映了執行計算的結果。

  1. package com.ricardozuasti;
  2. import java.util.concurrent.Callable;
  3. public class FictionalEmailSender implements Callable<Boolean>{
  4. private String to;
  5. private String subject;
  6. private String body;
  7. public FictionalEmailSender(String to, String subject, String body){
  8. this.to = to;
  9. this.subject = subject;
  10. this.body = body;
  11. }
  12. @Override
  13. public Boolean call() throws InterruptedException {
  14. // 在0~0.5秒間模擬發送郵件
  15. Thread.sleep(Math.round(Math.random()*0.5*1000));
  16. // 假設我們有80%的幾率成功發送郵件
  17. if(Math.random()>0.2){
  18. return true;
  19. }else{
  20. return false;
  21. }
  22. }
  23. }

注意:Callable接口可用於返回任意數據類型,因此我們的任務可以返回我們需要的任何信息。

現在,我們使用一個線程池ExecutorService來發送郵件,由於我們的任務是以Callable接口實現的,我們提交執行的每個新任務,都會得到一個Future引用。注意我們要使用直接的構造器創建ExecutorService,而不是使用來自Executors的工具方法創建。這是因為使用指定類ThreadPoolExecutor提供了一些方法可以派上用場。

  1. package com.ricardozuasti;
  2. import java.util.concurrent.Future;
  3. import java.util.concurrent.LinkedBlockingQueue;
  4. import java.util.concurrent.ThreadPoolExecutor;
  5. import java.util.concurrent.TimeUnit;
  6. import java.util.ArrayList;
  7. import java.util.List;
  8. public class Concurrency2 {
  9. public static void main(String[] args){
  10. try{
  11. ThreadPoolExecutor executor = new ThreadPoolExecutor(30, 30, 1,
  12. TimeUnit.SECONDS, new LinkedBlockingQueue());
  13. List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(9000);
  14. // 發送垃圾郵件, 用戶名假設為4位數字
  15. for(int i=1000; i<10000; i++){
  16. futures.add(executor.submit(new FictionalEmailSender(i+"@sina.com",
  17. "Knock, knock, Neo", "The Matrix has you...")));
  18. }
  19. // 提交所有的任務後,關閉executor
  20. System.out.println("Starting shutdown...");
  21. executor.shutdown();
  22. // 每秒鐘打印執行進度
  23. while(!executor.isTerminated()){
  24. executor.awaitTermination(1, TimeUnit.SECONDS);
  25. int progress = Math.round((executor.getCompletedTaskCount()
  26. *100)/executor.getTaskCount());
  27. System.out.println(progress + "% done (" +
  28. executor.getCompletedTaskCount() + " emails have been sent).");
  29. }
  30. // 現在所有郵件已發送完, 檢查futures, 看成功發送的郵件有多少
  31. int errorCount = 0;
  32. int successCount = 0;
  33. for(Future<Boolean> future : futures){
  34. if(future.get()){
  35. successCount++;
  36. }else{
  37. errorCount++;
  38. }
  39. }
  40. System.out.println(successCount + " emails were successfully sent, but " +
  41. errorCount + " failed.");
  42. }catch(Exception ex){
  43. ex.printStackTrace();
  44. }
  45. }
  46. }
Copyright © Linux教程網 All Rights Reserved