CompletableFuture で遊ぶ
あけましておめでとうございます!
ずいぶん遅い新年の挨拶となりますが本年もよろしくお願いします。
それでは新年1発目のネタは私の愛する Java です!
Java 8 の新機能である CompletableFuture の詳細が下記サイトで解説されはじめました。
詳解 Java SE 8 第19回 Concurrency Utilitiesのアップデート その1
Java 8 が正式リリースされる前から気にはなっていたのですが海外のサイトでも情報量が少なく英語がよく解らないので正しい使い方が解らずにいました。
おまけに Java API ドキュメントもその当時は日本語のものはありませんでした。
現在は日本語の API ドキュメントも用意されているのでうれしい限りです。
それではとりあえず何か適当にプログラムを組んでみることにします。
お正月ということでちょっとふざけた内容としました。
ネタがネタだけにごめんなさいと先に行っておきます。(ヲヒ
今回のネタは CompletableFuture を使ってプログラムを高速化するです。(そんな大袈裟なものじゃないし、使わなくても可能なのは秘密です。)
あなたは、ある IT 企業に勤めています。今回あるプロジェクトのリーダーを任されました。
あなたは何人かメンバーを選出しなければなりません。
そこで希望者を募ったところ、たくさんの魅力的な女性プログラマ達があなたを取り囲みました。
予想外の出来事にあなたは大喜びで全員をプロジェクトチームに加えようとしましたが・・・
なんと女性達は隣の女性をチームに加えるなら私は辞退すると言います。
たとえば、A子、B子、C子、D子、E子 と言う具合にあなたを中心に取り囲んでいるとしたら
A子をメンバーに加えると、B子、E子はチームに加えることは出来ないということです。
B子をメンバーに加えると、A子と C子はチームに加えることはできない。
困りました。
そこであなたは胸ポケットから「おっぱいスカウター」という秘密兵器を取り出し女性達のバストを計測できるようにしました。
そう、あなたは、おっぱい星人だったのです。
それを利用してチームに加える女性達のバストの合計値が最大になるように選出することにします。
これからもこういうことがちょくちょくあるかもしれないのであなたはプログラムを組むことにします。
さて、あなたならどんなプログラムを組むでしょう?(Java 8 で組むこと)
女性プログラマ達のバストのサイズは配列に乱数を生成して格納します。
配列の要素数は女性プログラマの人数となります。
さて、あなたならどんなコードを書くでしょうか?
最も簡単な例は次のようなコードになると思います。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
package jp.yucchi.oppaialien; import java.security.SecureRandom; import java.util.Random; import java.util.stream.Stream; /** * * @author Yucchi */ public class OppaiGenerator { Random random = new SecureRandom(); int[] oppai = Stream.generate(random::nextDouble) .mapToDouble(e -> e) .filter(e -> (e >= 0.7 && e < 1.2)) .mapToInt(e -> (int) (e * 100)) .limit(7) .toArray(); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
package jp.yucchi.oppaialien; import java.security.SecureRandom; import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.IntStream; /** * * @author Yucchi */ class OppaiSearch { private final int option; private final int[] oppai; private int result; OppaiSearch(int option, int[] oppai) { this.option = option; this.oppai = oppai; } public int getOppaiTask() { Random random = new SecureRandom(); int GIRLS = oppai.length; int[] dp = new int[GIRLS]; IntStream.range(0, GIRLS - 1).forEach(e -> { try { TimeUnit.MILLISECONDS.sleep(random.nextInt(1_000)); } catch (InterruptedException ex) { Logger.getLogger(OppaiSearch.class.getName()).log(Level.SEVERE, null, ex); } dp[e] = oppai[e + option]; if (e > 0) { dp[e] = Math.max(dp[e], dp[e - 1]); } if (e > 1) { dp[e] = Math.max(dp[e], dp[e - 2] + oppai[e + option]); } result = Math.max(result, dp[e]); System.out.println(Thread.currentThread().getName() + " : " + result); }); return result; } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
package jp.yucchi.oppaialien; import java.util.Arrays; /** * * @author Yucchi */ public class OppaiAlien { /** * @param args the command line arguments */ public static void main(String[] args) { OppaiGenerator oppaiGenerator = new OppaiGenerator(); System.out.println(Arrays.toString(oppaiGenerator.oppai) + "\n"); OppaiSearch oppai_0 = new OppaiSearch(0, oppaiGenerator.oppai); OppaiSearch oppai_1 = new OppaiSearch(1, oppaiGenerator.oppai); int maxOppai_0 = oppai_0.getOppaiTask(); int maxOppai_1 = oppai_1.getOppaiTask(); System.out.println(Thread.currentThread().getName() + "おっぱいのサイズの最大総和は " + Math.max(maxOppai_0, maxOppai_1) + "です。"); } } |
デバッグ用に必要の無いものがありますが気にしないでください。
一見、実にシンプルでいて合理的なように見えます。
それでは動かしてみます。
順番に処理されているのが解ります。
処理時間は OppaiSearch クラスの getOppaiTask() メソッドの中のループ処理中にランダムなスリープを少し挟んでいるので気にしないでください。
このプログラムを高速化するには getOppaiTask() メソッドを並行処理してしまうのが手っ取り早いですね!
そこで CompletableFuture を使って楽に高速化してみます。
OppaiAlien クラスを次のように変更します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
package jp.yucchi.oppaialien; import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Level; import java.util.logging.Logger; /** * * @author Yucchi */ public class OppaiAlien { private static Integer result; /** * @param args the command line arguments */ public static void main(String[] args) { OppaiGenerator oppaiGenerator = new OppaiGenerator(); System.out.println(Arrays.toString(oppaiGenerator.oppai) + "\n"); OppaiSearch oppai_0 = new OppaiSearch(0, oppaiGenerator.oppai); OppaiSearch oppai_1 = new OppaiSearch(1, oppaiGenerator.oppai); CompletableFuture<Integer> future0 = CompletableFuture.supplyAsync(() -> oppai_0.getOppaiTask()); CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> oppai_1.getOppaiTask()); // 両方の処理が終わってから計算値が大きい方を返す。 CompletableFuture<Integer> f = future0.thenCombine(future1, (f0, f1) -> { System.out.println(Thread.currentThread().getName() + " : CompletableFuture<Integer> f"); return Math.max(f0, f1); }); try { result = f.get(3, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException ex) { Logger.getLogger(OppaiAlien.class.getName()).log(Level.SEVERE, null, ex); } catch (TimeoutException ex) { f.complete(-1); try { result = f.get(); } catch (InterruptedException | ExecutionException ex1) { Logger.getLogger(OppaiAlien.class.getName()).log(Level.SEVERE, null, ex1); } } if (result < 0) { System.out.println("Timeout. Please use a more high-performance computer"); } else { System.out.println(Thread.currentThread().getName() + "おっぱいのサイズの最大総和は " + result + "です。"); } } } |
見慣れないコードがありますね。
次のように supplyAsync() メソッドを使って CompletableFutureオブジェクトを生成します。
CompletableFuture<Integer> future0 = CompletableFuture.supplyAsync(() -> oppai_0.getOppaiTask());
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> oppai_1.getOppaiTask());
そして supplyAsync() メソッドで作った二つの CompletableFutureオブジェクト を非同期で処理させます。
supplyAsync() メソッドは API ドキュメントでは次のように説明されています。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
ForkJoinPool.commonPool()で実行されているタスクが指定されたサプライヤを呼び出して取得した値を使用して非同期的に完了する新しいCompletableFutureを返します。
それでは次にこれらから得られる値の大きなほうが最終的な結果となります。
実はこのプログラムはちょっと遊びが入っているので本来の目的だけを達成するには次のコードで完了させることができます。
try {
System.out.println(“おっぱいのサイズの最大総和は ” + Math.max(future0.get(), future1.get()) + “です。”);
System.out.println(“プログラムを終了します。”);
} catch (InterruptedException | ExecutionException ex) {
Logger.getLogger(OppaiAlien.class.getName()).log(Level.SEVERE, null, ex);
}
get() メソッドは処理が完了するまで待つので両方の非同期処理の結果を取得してから Math.max() メソッドは実行されます。
このプログラムでは thenCombine() メソッドを使って、CompletableFuture<Integer> future0 と CompletableFuture<Integer> future1 の両方の処理が終わるまで待って
それらを使って処理をして結果を返すようにしています。
// 両方の処理が終わってから計算値が大きい方を返す。
CompletableFuture<Integer> f = future0.thenCombine(future1, (f0, f1) -> {
System.out.println(Thread.currentThread().getName() + ” : CompletableFuture<Integer> f”);
return Math.max(f0, f1);
});
thenCombine() メソッドは API ドキュメントによると次のように説明されています。
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
このステージと指定された他のステージの両方が正常終了した際に実行される新しいCompletionStageを返します(実行時には、指定された関数の引数として2つの結果が使用される)。
それでは先に進みましょう。
このプログラムではさらに余計なことをしています。
get() メソッドにタイムリミットを設定しました。(^_^;)
result = f.get(3, TimeUnit.SECONDS);
タイムアウトしたら
f.complete(-1);
と、CompletableFuture<Integer> f に –1 を設定します。
実はこのプログラムではこんなことをせずにタイムアウトが発生したら result に –1 を代入すればいいだけなんですが complete() メソッドを使いたかったからこうなっただけです。( ̄。 ̄;)
それではこのプログラムの実行結果を見てみましょう。
Fork/Join Framework が使われて並行処理されているのが解ります。
また、両方の処理が完了されてから最終的な処理もされているのが確認できます。
今回の目的である並行処理によるプログラムの高速化を CompletableFuture を使ってすることができました。(^_^)
ただ、これでいいのか?それとももっと良い使い方があるのかは不明です。
これからの 詳解 Java SE 8 第19回 Concurrency Utilitiesのアップデート の記事に注目していきましょう!
ここからはおまけです。
今回のプログラムでは両方の処理が完了するのを待ってました。
先に終了した方を表示するだけの場合も試してみます。
最終処理をこちらに変更します。
applyToEither() メソッドを使います。
これはどちらかが処理結果を得られれば、その結果を指定された関数に渡します。
// 処理が早く終わった方を返す。
CompletableFuture<Integer> f = future0.applyToEither(future1, x -> {
return x;
});
applyToEither() メソッド は API ドキュメントでは次のように説明されています。
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other,
Function<? super T,U> fn)
このステージまたは指定された他のステージが正常に完了したときに、対応する結果を指定された関数への引数に設定して実行される新しいCompletionStageを返します。
それでは実行結果を見てみましょう。
まだ片方しか処理が終わってないのに最終処理がされているのが確認できます。
目的のプログラムとしては駄目ですが、あくまで参考ということで!
間違い、もしくはもっと COOL な方法があれば教えていただければ幸いです。
TAGS: Java | 2015年1月18日2:38 PM | Comments : 2