CompletableFuture を解らないまま試してみた
Java8 の目玉機能は並行処理を容易にするために採用された Lambda に注目が集まってます。
その他にも便利な機能が追加されているのですが何故か人気の無い? CompletableFuture を試してみました。
これも外国語のサイトからの情報を元に解らないままゴニョゴニョしてみました。
ちなみに CompletableFuture の情報はグーグル先生に聞いてみても新しい情報は数件しかヒットしませんでした。
誰かが日本語で優しく解説してくれるのを待っていたのですが Lambda の人気の高さからか見向きもされない可愛そうな子となってます。(ヲヒ
CompletableFuture っていったい何者なの?
これまでは非同期処理の返り値を取るためには ExecutorService、 Callable、Future インターフェースを使っていました。
この CompletableFuture を使えばそれらを簡単にできちゃうよって優れもののようです。
それでは試してみましょう。
だいたいの流れをサクッと説明すると、
それぞれ、countHoge() , countFuga() , countChome() と言うメソッドで時間のかかる計算結果を取得するものとします。
それぞれ 1 秒、3 秒、7 秒と計算時間を要します。
返り値は、1 、3 、7 とします。
もちろん ExecutorService を利用します。
それぞれ CompletableFuture.supplyAsync() にて CompletableFuture を生成します。
CompletableFuture.allOf() メソッドにてそれぞれをひとまとめにします。
thenRun() メソッドにて 処理を開始し、計算結果を返します。
あとは get() メソッドにて計算結果を取得すればいいだけです。
返り値を利用して新たに CompletableFuture<T> を作ることも可能です。
この CompletableFuture クラスには 50 種類ほどのメソッドが用意されてるのでかなり自由度が高くいろんなことができそうです。
ちなみに良く解らないのに適当に創ったコードなので間違いがあると思いますが笑って許してくださいませ。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
package jp.yucchi.trycompletablefuture; public class SomeWorks { public int countHoge() { try {Thread.sleep(1_000);}catch(InterruptedException e){throw new RuntimeException(e);} return 1; } public int countFuga() { try {Thread.sleep(3_000);}catch(InterruptedException e){throw new RuntimeException(e);} return 3; } public int countChome() { try {Thread.sleep(7_000);}catch(InterruptedException e){throw new RuntimeException(e);} return 7; } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
package jp.yucchi.trycompletablefuture; import java.lang.management.ManagementFactory; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; public class TryCompletableFuture { private static final SomeWorks works = new SomeWorks(); private static ExecutorService executor; private static CompletableFuture<Integer> hoge; private static CompletableFuture<Integer> fuga; private static CompletableFuture<Integer> chome; private static CompletableFuture<Void> doAllWorks; public static void main(String[] args) { int procs = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors(); if (procs < 3) { procs = 3; } executor = Executors.newFixedThreadPool(procs); hoge = CompletableFuture.supplyAsync(() -> works.countHoge(), executor); fuga = CompletableFuture.supplyAsync(() -> works.countFuga(), executor); chome = CompletableFuture.supplyAsync(() -> works.countChome(), executor); doAllWorks = CompletableFuture.allOf(hoge, fuga, chome); workCompletableFutures(); try { System.out.println(hoge.get()); } catch (ExecutionException | InterruptedException ex) { Logger.getLogger(TryCompletableFuture.class.getName()).log(Level.SEVERE, null, ex); } try { System.out.println(fuga.get()); } catch (InterruptedException | ExecutionException ex) { Logger.getLogger(TryCompletableFuture.class.getName()).log(Level.SEVERE, null, ex); } try { System.out.println(chome.get()); } catch (InterruptedException | ExecutionException ex) { Logger.getLogger(TryCompletableFuture.class.getName()).log(Level.SEVERE, null, ex); } CompletableFuture<String> monad = hoge.thenApply(r -> r * r * Math.PI).thenApply(Object::toString); try { System.out.println("monad = " + monad.get()); } catch (InterruptedException | ExecutionException ex) { Logger.getLogger(TryCompletableFuture.class.getName()).log(Level.SEVERE, null, ex); } while (true) { if (doAllWorks.isDone()) { System.out.println("executor.shutdown()"); executor.shutdown(); try { if (!executor.awaitTermination(20, TimeUnit.SECONDS)) { System.out.println("executor.shutdownNow()"); executor.shutdownNow(); break; } } catch (InterruptedException ex) { Logger.getLogger(TryCompletableFuture.class.getName()).log(Level.SEVERE, null, ex); executor.shutdownNow(); break; } break; } } } private static void workCompletableFutures() { long startTime = System.nanoTime(); System.out.println("<--- START --->"); doAllWorks.thenRun(() -> { try { if ((hoge.get() + fuga.get() + chome.get()) == 11) { System.out.println("<--- DONE --->"); } else { System.out.println("<--- ERROR --->"); } } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } long time = System.nanoTime() - startTime; System.out.println("処理時間は、" + (int) (time * 1e-9) / 3_600 + "時間" + (int) ((time * 1e-9) / 60) % 60 + "分" + (int) (time * 1e-9 % 60) + "秒" + Double.toString((time * 1e-9 % 60) % 1).substring(2)); }); } } |
出力結果は次のようになります。
<— START —>
1
3
7
<— DONE —>
処理時間は、0時間0分6秒9916577070000008
monad = 3.141592653589793
executor.shutdown()
処理時間からそれぞれの処理が平行実行されていることが解りますね。(^_^)
プロファイラで確認してみました。
thenRun() メソッドを使用すると上記のような結果となりますが
thenRunAsync() メソッドというものもあります。
これは ForkJoinPool.commonPool() を使っているようです。
若干 thenRun() メソッドと動作が違います。
こちらはメソッド内の処理が ExecutorService に全て乗らない場合があります。
登録されたタスクが終了した時点で shutdown 開始するとshutdown 開始までに ExecutorService に乗っていれば当然処理されるのですがタイミングが悪いと間に合わず未処理となってしまいます。
37 行目から 60 行目の計算結果取得処理をコメントアウトしてそれぞれ実行してみると良く解ります。
thenRun() メソッドの場合は次のように全て実行され出力されます。
<— START —>
<— DONE —>
executor.shutdown()
処理時間は、0時間0分6秒9965874390000007
thenRunAsync() メソッドを使用した場合は次のように一部処理されずプログラムが終了する場合があります。
<— START —>
executor.shutdown()
<— DONE —>
このあたりは微妙に違うので注意が必要ですね。
ついでに、thenRunAsync() メソッドを使用した場合のプロファイラでみてみました。
ForkJoinPool.commonPool() が存在してますね。
これらの使い分けのシチュエーションが思いつきません。(^_^;)
さて、時間のかかる処理をするのはいいけどあまりにも時間がかかりすぎた場合はデフォルト値を返すようにしたい場合がありますよね。
そんな時のために get(long timeout, TimeUnit unit) メソッドと complete(T value) メソッドが用意されてます。
プログラムに次のコードを追加して動かしてみましょう。
35 36 37 38 39 40 41 42 |
workCompletableFutures(); try { System.out.println(fuga.get(2, TimeUnit.SECONDS)); } catch (InterruptedException | ExecutionException | TimeoutException ex) { Logger.getLogger(TryCompletableFuture.class.getName()).log(Level.SEVERE, null, ex); fuga.complete(99); } |
fuga.get(2, TimeUnit.SECONDS) 2 秒でタイムアウトして fuga.complete(99) で fuga に 99 を設定します。
もちろん例外もスローされます。
ではプログラムを実行して確認してみましょう。
<— START —>
1
99
7 08, 2013 12:13:28 午前 jp.yucchi.trycompletablefuture.TryCompletableFuture main
SEVERE: null
java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedAwaitDone(CompletableFuture.java:376)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1555)
at jp.yucchi.trycompletablefuture.TryCompletableFuture.main(TryCompletableFuture.java:39)
<— ERROR —>
7
処理時間は、0時間0分6秒9896285360000006
monad = 3.141592653589793
executor.shutdown()
ちゃんとタイムアウトして例外を投げてそして fuga に 99 をセットしてますね。
fuga の値が 99 になることによって hoge, fuga, chome の合計値が 11 と等しくならないので <— ERROR —> と判定され出力されてます。
もう、ExecutorService、 Callable、Future インターフェース の時代はオワコンかな?
ざっと確認したけど間違ってる可能性が高いので日本語で誰かが詳しく解説してくれるのを待つとしましょうか。
かなり CompletableFuture っていけてる気がします!
Java8 早くリリースしておくれ!
TAGS: Java | 2013年7月8日12:14 AM
Trackback URL