Java 8 時代の ConcurrentHashMap で遊んでみた。
またまた Blog の更新をさぼっています。
と言うことで適当なエントリーを書いてみます。
ConcurrentHashMap
J2SE5.0 で java.util.concurrent に ConcurrentHashMap が導入されました。
ConcurrentHashMap は容易に安全に並行処理を実現するために作られました。
Collections.synchronizedMap ってのもあるのですが、それは使い方が微妙で詳しい説明は省略しますが Iterator などの反復処理で ConcurrentModificationException をスローすることがあります。
Collections.synchronizedMap は、Map のロックを取得すると他のスレッドからアクセスを禁じられます。
ConcurrentHashMap だと、Iterator および Enumeration は、ある時点または反復子/列挙の作成以降のハッシュテーブルの状態を反映する要素を返すように設計されているため、
ConcurrentModificationException をスローすることはありません。
しかし、一度に 1 つのスレッドだけしか反復子を使用できません。
あと、HashMap はエントリーのキーや値にnullを使用できますが、ConcurrentHashMapでは使用できません。
NullPointerException が投げられます。
ConcurrentHashMap は、HashMap のようなハッシュを使った Map ですがロックストライピングと言われる粒度の小さなロック方式を採用しています。
つまり、Map 全体をロックして排他アクセスする必要がなければ ConcurrentHashMap は、とても魅力的な存在となります。
しかし、J2SE5.0 の時には、ConcurrentHashMap を便利に使うにはまだまだパーツが揃っていませんでした。
便利に使えそうだったのはアトミックに実行される下記メソッドくらいでした。
public V putIfAbsent(K key, V value)
public boolean remove(Object key, Object value)
public V replace(K key, V value)
public boolean replace(K key, V oldValue, V newValue)
また古い J2SE5.0 ネタかと思わせて、ここから最新の Java SE 8 ネタです。
Java SE 8 で ConcurrentHashMap に追加された便利な機能をいくつか試してみます。
はじめに、ConcurrentHashMap というネーミングから何をやってもスレッドセーフだと思い込んでしまうと泣きます。
例えば、下記プログラムのように複数スレッドから ConcurrentHashMap の値の更新があるとします。
最終的に Key “HOGE”, Value は 80 となることを期待しますが残念なことになります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
package jp.yucchi.concurrenthashmapexample0; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; /** * * @author Yucchi */ class Counter { ConcurrentHashMap<String, Integer> map; private Random random = new Random(); public Counter(ConcurrentHashMap<String, Integer> map) { this.map = map; } void incrementCounter() { for (int i = 0; i < 10; i++) { try { TimeUnit.MILLISECONDS.sleep(random.nextInt(50)); } catch (InterruptedException ex) { Logger.getLogger(Counter.class.getName()).log(Level.SEVERE, null, ex); } int counter = map.get("HOGE"); map.put("HOGE", ++counter); } System.out.println(Thread.currentThread().getName() + ": HOGE, " + map.get("HOGE")); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
package jp.yucchi.concurrenthashmapexample0; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; /** * * @author Yucchi */ public class ConcurrentHashMapExample0 { public static void main(String[] args) { ConcurrentHashMap<String, Integer> count = new ConcurrentHashMap<>(); count.put("HOGE", 0); Counter counter = new Counter(count); CompletableFuture<Void> cnt_1 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> cnt_2 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> cnt_3 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> cnt_4 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> cnt_5 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> cnt_6 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> cnt_7 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> cnt_8 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> allTask = CompletableFuture.allOf(cnt_1, cnt_2, cnt_3, cnt_4, cnt_5, cnt_6, cnt_7, cnt_8); allTask.join(); if (allTask.isDone()) { count.forEach(1, (key, value) -> { System.out.println(Thread.currentThread().getName() + " : " + key + ", " + value); }); } } } |
このプログラムの実行結果は次のようになります。
ForkJoinPool.commonPool-worker-18: HOGE, 48
ForkJoinPool.commonPool-worker-29: HOGE, 58
ForkJoinPool.commonPool-worker-8: HOGE, 65
ForkJoinPool.commonPool-worker-4: HOGE, 67
ForkJoinPool.commonPool-worker-11: HOGE, 68
ForkJoinPool.commonPool-worker-15: HOGE, 70
ForkJoinPool.commonPool-worker-25: HOGE, 73
ForkJoinPool.commonPool-worker-22: HOGE, 75
main : HOGE, 75
更新処理まではスレッドセーフでないということですね。
それではどのように対処すればいいでしょうか?
java.util.concurrent.atomic.LongAdder を使ってみます。
LongAdder クラスは、プリミティブ型に対するアトミック操作を行うため Java SE 8 から追加されたクラスです。
LongAdder クラスには、add(1)と等価な increment() メソッドと、add(-1)と等価な decrement() メソッドがあります。
LongAdder クラスを使って修正したプログラムは下記のようになります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
package jp.yucchi.concurrenthashmapexample1; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; import java.util.logging.Level; import java.util.logging.Logger; /** * * @author Yucchi */ class Counter { ConcurrentHashMap<String, LongAdder> map; private Random random = new Random(); public Counter(ConcurrentHashMap<String, LongAdder> map) { this.map = map; } void incrementCounter() { for (int i = 0; i < 10; i++) { try { TimeUnit.MILLISECONDS.sleep(random.nextInt(50)); } catch (InterruptedException ex) { Logger.getLogger(Counter.class.getName()).log(Level.SEVERE, null, ex); } map.get("HOGE").increment(); } System.out.println(Thread.currentThread().getName() + ": HOGE, " + map.get("HOGE")); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.LongAdder; /** * * @author Yucchi */ public class ConcurrentHashMapExample1 { public static void main(String[] args) { ConcurrentHashMap<String, LongAdder> count = new ConcurrentHashMap<>(); count.put("HOGE", new LongAdder()); Counter counter = new Counter(count); CompletableFuture<Void> cnt_1 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> cnt_2 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> cnt_3 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> cnt_4 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> cnt_5 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> cnt_6 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> cnt_7 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> cnt_8 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> allTask = CompletableFuture.allOf(cnt_1, cnt_2, cnt_3, cnt_4, cnt_5, cnt_6, cnt_7, cnt_8); allTask.join(); if (allTask.isDone()) { count.forEach(1, (key, value) -> { System.out.println(Thread.currentThread().getName() + " : " + key + ", " + value); }); } } } |
このプログラムの実行結果は次のようになります。
ForkJoinPool.commonPool-worker-25: HOGE, 49
ForkJoinPool.commonPool-worker-18: HOGE, 63
ForkJoinPool.commonPool-worker-11: HOGE, 71
ForkJoinPool.commonPool-worker-4: HOGE, 72
ForkJoinPool.commonPool-worker-29: HOGE, 73
ForkJoinPool.commonPool-worker-8: HOGE, 78
ForkJoinPool.commonPool-worker-15: HOGE, 79
ForkJoinPool.commonPool-worker-22: HOGE, 80
main : HOGE, 80
期待通りにうごいてくれました。
でも、ConcurrentHashMap<String, LongAdder> とかに変更するのも面倒ですよね。
それでは、Java SE 8 でのもっと素敵な修正プログラムを紹介します。
下記のようなアトミック操作をするメソッドを利用します。
public V compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction)
public V computeIfPresent(K key, BiFunction<? super K,? super V,? extends V> remappingFunction)
public V merge(K key, V value, BiFunction<? super V,? super V,? extends V> remappingFunction)
compute メソッドは、指定されたキーと現在マップされている値に対するマッピングの計算を試みます(現在のマッピングが存在しない場合はnull
)。
computeIfPresent メソッドは、指定されたキーの値が存在する場合、キーと現在マップされている値から新しいマッピングの計算を試みます。
merge メソッドは、指定されたキーがまだ(nullでない)値と関連付けられていない場合は、指定された値に関連付けます。それ以外の場合は、指定された再マッピング関数の結果で値を置換し、null
の場合は削除します。
今回は、computeIfPresent メソッドを使ってみました。
他のメソッドの使用例はコメントアウトして記述してあります。
プログラムは下記のようになります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
package jp.yucchi.concurrenthashmapexample2; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; /** * * @author Yucchi */ class Counter { ConcurrentHashMap<String, Integer> map; private Random random = new Random(); public Counter(ConcurrentHashMap<String, Integer> map) { this.map = map; } void incrementCounter() { for (int i = 0; i < 10; i++) { try { TimeUnit.MILLISECONDS.sleep(random.nextInt(50)); } catch (InterruptedException ex) { Logger.getLogger(Counter.class.getName()).log(Level.SEVERE, null, ex); } map.computeIfPresent("HOGE", (k, v) -> ++v); // map.compute("HOGE", (k, v) -> ++v); // map.merge("HOGE", 1, Integer::sum); } System.out.println(Thread.currentThread().getName() + ": HOGE, " + map.get("HOGE")); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
package jp.yucchi.concurrenthashmapexample2; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; /** * * @author Yucchi */ public class ConcurrentHashMapExample2 { /** * @param args the command line arguments */ public static void main(String[] args) { ConcurrentHashMap<String, Integer> count = new ConcurrentHashMap<>(); count.put("HOGE", 0); Counter counter = new Counter(count); CompletableFuture<Void> cnt_1 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> cnt_2 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> cnt_3 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> cnt_4 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> cnt_5 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> cnt_6 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> cnt_7 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> cnt_8 = CompletableFuture.runAsync(() -> counter.incrementCounter()); CompletableFuture<Void> allTask = CompletableFuture.allOf(cnt_1, cnt_2, cnt_3, cnt_4, cnt_5, cnt_6, cnt_7, cnt_8); allTask.join(); if (allTask.isDone()) { count.forEach(1, (key, value) -> { System.out.println(Thread.currentThread().getName() + " : " + key + ", " + value); }); } } } |
このプログラムの実行結果は次のようになります。
ForkJoinPool.commonPool-worker-4: HOGE, 59
ForkJoinPool.commonPool-worker-15: HOGE, 61
ForkJoinPool.commonPool-worker-18: HOGE, 62
ForkJoinPool.commonPool-worker-22: HOGE, 68
ForkJoinPool.commonPool-worker-11: HOGE, 74
ForkJoinPool.commonPool-worker-25: HOGE, 78
ForkJoinPool.commonPool-worker-8: HOGE, 79
ForkJoinPool.commonPool-worker-29: HOGE, 80
main : HOGE, 80
これまでのところは J2SE5.0 でも注意が必要で replace メソッドを使う方法、AtomicLong を使うなどの方法がありました。
Java SE 8 では綺麗にその問題を解決できるようになりましたね!
それでは、これから Java SE 8 で追加されたスレッドセーフなバルクオペレーションを試してみます。
下記のプログラムを組んで、Java SE 8 で ConcurrentHashMap に追加された機能の一部を試してみました。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
package jp.yucchi.concurrenthashmapexample3; import java.lang.management.ManagementFactory; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ForkJoinPool; /** * * @author Yucchi */ public class ConcurrentHashMapExample3 { public static void main(String[] args) { // CPU System.out.println("CPU: " + ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors()); // -Djava.util.concurrent.ForkJoinPool.common.parallelism=7 System.out.println("ForkJoinPoolParallelism: " + ForkJoinPool.getCommonPoolParallelism() + System.getProperty("line.separator")); ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(); map.put("JDK 1.1.4", "Sparkler"); map.put("JDK 1.1.5", "Pumpkin"); map.put("JDK 1.1.6", "Abigail"); map.put("JDK 1.1.7", "Brutus"); map.put("JDK 1.1.8", "Chelsea"); map.put("J2SE 1.2", "Playground"); map.put("J2SE 1.2.1", "null"); map.put("J2SE 1.2.2", "Cricket"); map.put("J2SE 1.3", "Kestrel"); map.put("J2SE 1.3.1", "Ladybird"); map.put("J2SE 1.4", "Merlin"); map.put("J2SE 1.4.1", "Hopper"); map.put("J2SE 1.4.2", "Mantis"); map.put("J2SE 5.0", "Tiger"); map.put("Java SE 6", "Mustang"); map.put("Java SE 7", "Dolphin"); map.put("Java SE 8", "null"); map.put("Java SE 9", ""); System.out.println("マッピング数: " + map.mappingCount() + System.getProperty("line.separator")); System.out.println("(○・ω・)ノ------------- forEach (BiConsumer) -------------"); map.forEach(1, (key, value) -> System.out.println(Thread.currentThread().getName() + ": " + key + " = " + value)); System.out.println(System.getProperty("line.separator") + "(○・ω・)ノ------------- forEach (BiFunction, Consumer) -------------"); map.forEach(1, (key, value) -> key.length() > 7 && value.length() > 7 ? Thread.currentThread().getName() + ": " + key + " = " + value : null, System.out::println); System.out.println(System.getProperty("line.separator") + "(○・ω・)ノ------------- forEachEntry (Function Consumer)-------------"); map.forEachEntry(1, entry -> entry.getKey().length() > 7 && entry.getValue().length() > 7 ? Thread.currentThread().getName() + ": " + entry.getKey() + " = " + entry.getValue() : null, System.out::println); System.out.println(System.getProperty("line.separator") + "(○・ω・)ノ------------- search -------------"); String searchResult = map.search(1, (key, value) -> key.length() > 7 && value.length() > 7 ? key + " = " + value : null); System.out.println("searchResult: " + searchResult); System.out.println(System.getProperty("line.separator") + "(○・ω・)ノ------------- searchKeys -------------"); String searchKeysResult = map.searchKeys(1, key -> key.length() > 7 ? key : null); System.out.println("searchKeysResult: " + searchKeysResult); System.out.println(System.getProperty("line.separator") + "(○・ω・)ノ------------- searchValues -------------"); String searchValuesResult = map.searchValues(1, value -> value.length() > 7 ? value : null); System.out.println("searchValuesResult: " + searchValuesResult); System.out.println(System.getProperty("line.separator") + "(○・ω・)ノ------------- searchEntries -------------"); String searchEntriesResult = map.searchEntries(1, entry -> entry.getKey().length() > 7 && entry.getValue().length() > 7 ? entry.getKey() + " = " + entry.getValue() : null); System.out.println("searchEntriesResult: " + searchEntriesResult); System.out.println(System.getProperty("line.separator") + "(○・ω・)ノ------------- reduce -------------"); String reduceResult = map.reduce(1, (key, value) -> key + " = " + value, (s1, s2) -> s1 + System.getProperty("line.separator") + s2); System.out.println("reduceResult" + System.getProperty("line.separator") + reduceResult); System.out.println(System.getProperty("line.separator") + "(○・ω・)ノ------------- reduceKeys -------------"); String reduceKeysResult = map.reduceKeys(1, key -> key.length() > 9 ? key : null, (s1, s2) -> s1 + System.getProperty("line.separator") + s2); System.out.println("reduceKeysResult" + System.getProperty("line.separator") + reduceKeysResult); System.out.println(System.getProperty("line.separator") + "(○・ω・)ノ------------- reduceValues -------------"); String reduceValuesResult = map.reduceValues(1, value -> value.length() > 4 && value.length() < 7 ? value : null, (s1, s2) -> s1 + System.getProperty("line.separator") + s2); System.out.println("reduceValuesResult" + System.getProperty("line.separator") + reduceValuesResult); System.out.println(System.getProperty("line.separator") + "(○・ω・)ノ------------- reduceEntries -------------"); String reduceEntriesResult = map.reduceEntries(1, entry -> entry.getKey().length() > 7 && entry.getValue().length() > 7 ? entry.getKey() + " = " + entry.getValue() : null, (s1, s2) -> s1 + System.getProperty("line.separator") + s2); System.out.println("reduceEntriesResult" + System.getProperty("line.separator") + reduceEntriesResult); System.out.println(System.getProperty("line.separator") + "(○・ω・)ノ------------- reduceMaxValuesResult -------------"); int reduceMaxValuesResult = map.reduceValuesToInt(1, String::length, 0, Integer::max); System.out.println(reduceMaxValuesResult); System.out.println(System.getProperty("line.separator") + "(○・ω・)ノ------------- お終い! -------------"); } } |
このプログラムの実行結果は次のようになります。
CPU: 32
ForkJoinPoolParallelism: 7
マッピング数: 18
(○・ω・)ノ————- forEach (BiConsumer) ————-
main: JDK 1.1.7 = Brutus
main: Java SE 9 =
main: JDK 1.1.8 = Chelsea
main: Java SE 8 = null
main: JDK 1.1.5 = Pumpkin
main: JDK 1.1.6 = Abigail
main: J2SE 1.2 = Playground
main: JDK 1.1.4 = Sparkler
ForkJoinPool.commonPool-worker-3: J2SE 1.2.2 = Cricket
main: Java SE 7 = Dolphin
ForkJoinPool.commonPool-worker-3: J2SE 1.3.1 = Ladybird
ForkJoinPool.commonPool-worker-4: J2SE 1.4 = Merlin
ForkJoinPool.commonPool-worker-1: J2SE 1.4.2 = Mantis
ForkJoinPool.commonPool-worker-2: J2SE 1.3 = Kestrel
main: Java SE 6 = Mustang
ForkJoinPool.commonPool-worker-5: J2SE 1.4.1 = Hopper
ForkJoinPool.commonPool-worker-4: J2SE 5.0 = Tiger
ForkJoinPool.commonPool-worker-3: J2SE 1.2.1 = null
(○・ω・)ノ————- forEach (BiFunction, Consumer) ————-
ForkJoinPool.commonPool-worker-5: J2SE 1.3.1 = Ladybird
main: J2SE 1.2 = Playground
ForkJoinPool.commonPool-worker-1: JDK 1.1.4 = Sparkler
(○・ω・)ノ————- forEachEntry (Function Consumer)————-
ForkJoinPool.commonPool-worker-1: J2SE 1.3.1 = Ladybird
ForkJoinPool.commonPool-worker-6: JDK 1.1.4 = Sparkler
ForkJoinPool.commonPool-worker-7: J2SE 1.2 = Playground
(○・ω・)ノ————- search ————-
searchResult: J2SE 1.3.1 = Ladybird
(○・ω・)ノ————- searchKeys ————-
searchKeysResult: J2SE 1.2.2
(○・ω・)ノ————- searchValues ————-
searchValuesResult: Sparkler
(○・ω・)ノ————- searchEntries ————-
searchEntriesResult: JDK 1.1.4 = Sparkler
(○・ω・)ノ————- reduce ————-
reduceResult
JDK 1.1.7 = Brutus
Java SE 9 =
JDK 1.1.8 = Chelsea
Java SE 8 = null
JDK 1.1.5 = Pumpkin
JDK 1.1.6 = Abigail
J2SE 1.2 = Playground
JDK 1.1.4 = Sparkler
Java SE 7 = Dolphin
Java SE 6 = Mustang
J2SE 1.2.2 = Cricket
J2SE 1.3.1 = Ladybird
J2SE 1.2.1 = null
J2SE 1.3 = Kestrel
J2SE 1.4 = Merlin
J2SE 5.0 = Tiger
J2SE 1.4.2 = Mantis
J2SE 1.4.1 = Hopper
(○・ω・)ノ————- reduceKeys ————-
reduceKeysResult
J2SE 1.2.2
J2SE 1.3.1
J2SE 1.2.1
J2SE 1.4.2
J2SE 1.4.1
(○・ω・)ノ————- reduceValues ————-
reduceValuesResult
Brutus
Merlin
Tiger
Mantis
Hopper
(○・ω・)ノ————- reduceEntries ————-
reduceEntriesResult
J2SE 1.2 = Playground
JDK 1.1.4 = Sparkler
J2SE 1.3.1 = Ladybird
(○・ω・)ノ————- reduceMaxValuesResult ————-
10
(○・ω・)ノ————- お終い! ————-
それでは、プログラムをざっくり追ってみましょう。
16 行目でこのプログラムを実行しているコンピュータの論理 CPU 数を取得しています。
そして、VM オプションを -Djava.util.concurrent.ForkJoinPool.common.parallelism=7 としてあるので正しく設定されているか確認を 18 行目のコードで行っています。
// CPU
System.out.println(“CPU: ” + ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors());
// -Djava.util.concurrent.ForkJoinPool.common.parallelism=7
System.out.println(“ForkJoinPoolParallelism: ” + ForkJoinPool.getCommonPoolParallelism() + System.getProperty(“line.separator”));
20 行目から 38 行目にかけて、ConcurrentHashMap を生成してデータを挿入しています。
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put(“JDK 1.1.4”, “Sparkler”);
.
.
map.put(“Java SE 9”, “”);
40 行目でマッピング数を取得しています。
System.out.println(“マッピング数: ” + map.mappingCount()
+ System.getProperty(“line.separator”));
public long mappingCount() は、API ドキュメントによると次のように書かれています。
マッピングの数を返します。ConcurrentHashMapにはintで表すことができる数より多くのマッピングが含まれている可能性があるため、
size()のかわりにこのメソッドを使用するようにしてください。返される値は推定値であり、挿入や削除が同時に行われた場合、実際のカウントは異なる可能性があります。
大きな要素を持つ場合は size() の代わりに mappingCount() を使うことができるようになったようですね。
forEach
43 行目から 57 行目までは forEach を試しています。
forEach は次のように 9 種類用意されています。
引数に Function, BiFunction のような関数を含むものもあり便利に使えます。
public void forEach(BiConsumer<? super K,? super V> action)
public void forEach(long parallelismThreshold,
BiConsumer<? super K,? super V> action)public <U> void forEach(long parallelismThreshold,
BiFunction<? super K,? super V,? extends U> transformer,
Consumer<? super U> action)public void forEachKey(long parallelismThreshold,
Consumer<? super K> action)public <U> void forEachKey(long parallelismThreshold,
Function<? super K,? extends U> transformer,
Consumer<? super U> action)public void forEachValue(long parallelismThreshold,
Consumer<? super V> action)public <U> void forEachValue(long parallelismThreshold,
Function<? super V,? extends U> transformer,
Consumer<? super U> action)public void forEachEntry(long parallelismThreshold,
Consumer<? super Map.Entry<K,V>> action)public <U> void forEachEntry(long parallelismThreshold,
Function<Map.Entry<K,V>,? extends U> transformer,
Consumer<? super U> action)
今回のプログラムでは、
public void forEach(long parallelismThreshold,
BiConsumer<? super K,? super V> action)
public <U> void forEach(long parallelismThreshold,
BiFunction<? super K,? super V,? extends U> transformer,
Consumer<? super U> action)
public <U> void forEachEntry(long parallelismThreshold,
Function<Map.Entry<K,V>,? extends U> transformer,
Consumer<? super U> action)
を使っています。
第一引数の parallelismThreshold は、このオペレーションを並列的に実行するために必要な(推定の)要素数となっています。いわゆる閾値です。
並行処理をするために第一引数の parallelismThreshold を 1 と設定しています。
それで本当に並行処理が行われているか確認のためにスレッド名を表示させています。
ちょっと注意がいるのは、public void forEach(BiConsumer<? super K,? super V> action) ですね。
これ、引数に parallelismThreshold がありません。
つまり、並行処理されません。
残りの引数は Java SE 8 ではお馴染みの関数ですから特に説明することはないでしょう。
一つだけ API ドキュメントの内容を紹介しておきます。
public <U> void forEach(long parallelismThreshold,
BiFunction<? super K,? super V,? extends U> transformer,
Consumer<? super U> action)各(キー, 値)のnullでない各変換に対し、指定されたアクションを実行します。
型パラメータ:
U – トランスフォーマの戻り値の型パラメータ:
parallelismThreshold – このオペレーションを並列的に実行するために必要な(推定の)要素数
transformer – 要素の変換を返す関数。変換がない場合はnull(その場合、アクションは適用されない)
action – アクション
個人的にはこれが一番使い勝手が良いかなって思っています。
それでは三つの forEach の実行結果を確認します。
はじめに第二引数に BiConsumer を持つ forEach の処理です。
map.forEach(1,
(key, value) -> System.out.println(Thread.currentThread().getName() + “: ” + key + ” = ” + value));
戻り値無しの処理でスレッド名と key, value を一つの文字列とします。(解りやすいように連結用の文字を追加しています。)
(○・ω・)ノ————- forEach (BiConsumer) ————-
main: JDK 1.1.7 = Brutus
main: Java SE 9 =
main: JDK 1.1.8 = Chelsea
main: Java SE 8 = null
main: JDK 1.1.5 = Pumpkin
main: JDK 1.1.6 = Abigail
main: J2SE 1.2 = Playground
main: JDK 1.1.4 = Sparkler
ForkJoinPool.commonPool-worker-3: J2SE 1.2.2 = Cricket
main: Java SE 7 = Dolphin
ForkJoinPool.commonPool-worker-3: J2SE 1.3.1 = Ladybird
ForkJoinPool.commonPool-worker-4: J2SE 1.4 = Merlin
ForkJoinPool.commonPool-worker-1: J2SE 1.4.2 = Mantis
ForkJoinPool.commonPool-worker-2: J2SE 1.3 = Kestrel
main: Java SE 6 = Mustang
ForkJoinPool.commonPool-worker-5: J2SE 1.4.1 = Hopper
ForkJoinPool.commonPool-worker-4: J2SE 5.0 = Tiger
ForkJoinPool.commonPool-worker-3: J2SE 1.2.1 = null
次は、引数が三つの forEach で第二引数に BiFunction を第三引数に Consumer を持っています。
map.forEach(1,
(key, value) -> key.length() > 7 && value.length() > 7 ? Thread.currentThread().getName() + “: ” + key + ” = ” + value : null,
System.out::println);
第二引数の BiFunction で key が 7 文字より長くかつ value が 7 文字より長い値をフィルタリングし、スレッド名と key, value を一つの文字列として返しています。(解りやすいように連結用の文字を追加しています。)
第三引数の Consumer ではそれらを標準出力に表示させています。
(○・ω・)ノ————- forEach (BiFunction, Consumer) ————-
ForkJoinPool.commonPool-worker-5: J2SE 1.3.1 = Ladybird
main: J2SE 1.2 = Playground
ForkJoinPool.commonPool-worker-1: JDK 1.1.4 = Sparkler
次の forEach は、forEachEntry で Map.Entry に操作を加えます。
map.forEachEntry(1,
entry -> entry.getKey().length() > 7 && entry.getValue().length() > 7 ? Thread.currentThread().getName() + “: ” + entry.getKey() + ” = ” + entry.getValue() : null,
System.out::println);
第二引数の Function で、Entry の key と value をそれぞれ取得、そして key が 7 文字より長くかつ value が 7 文字より長い値をフィルタリングし、スレッド名と key, value を一つの文字列として返しています。(解りやすいように連結用の文字を追加しています。)
第三引数の Consumer でではそれらを標準出力に表示させています。
(○・ω・)ノ————- forEachEntry (Function Consumer)————-
ForkJoinPool.commonPool-worker-1: J2SE 1.3.1 = Ladybird
ForkJoinPool.commonPool-worker-6: JDK 1.1.4 = Sparkler
ForkJoinPool.commonPool-worker-7: J2SE 1.2 = Playground
forEach だけでも至れり尽くせりで便利に使えますね。(^_^)
Function, BiFunction が使えるのって幸せです!
search
それでは search を見ていきましょう。
59 行目から 78 行目まで search を試しています。
search は 4 種類の優れたメソッドが追加されました。
それぞれ API ドキュメントの内容を見ていきましょう。
public <U> U search(long parallelismThreshold,
BiFunction<? super K,? super V,? extends U> searchFunction)指定された検索関数を各(キー、値)に適用し、nullでない結果を返します(存在しない場合はnull)。
成功した場合、その後の要素の処理は抑制され、検索関数の他の並列呼出しの結果は無視されます。型パラメータ:
U – 検索関数の戻り値の型パラメータ:
parallelismThreshold – このオペレーションを並列的に実行するために必要な(推定の)要素数
searchFunction – 成功した場合はnull以外の結果を返し、それ以外の場合はnullを返す関数。戻り値:
各(キー、値)に対して指定された検索関数を適用した場合のnull以外の結果。存在しない場合はnull
public <U> U searchKeys(long parallelismThreshold,
Function<? super K,? extends U> searchFunction)各キーに指定された検索関数を適用したnull以外の結果を返します。結果がない場合はnullを返します。
成功した場合、その後の要素の処理は抑制され、検索関数の他の並列呼出しの結果は無視されます。型パラメータ:
U – 検索関数の戻り値の型パラメータ:
parallelismThreshold – このオペレーションを並列的に実行するために必要な(推定の)要素数
searchFunction – 成功した場合はnull以外の結果を返し、それ以外の場合はnullを返す関数。戻り値:
各キーに対して指定された検索関数を適用した場合のnull以外の結果。存在しない場合はnull
public <U> U searchValues(long parallelismThreshold,
Function<? super V,? extends U> searchFunction)各値に指定された検索関数を適用したnull以外の結果を返します。結果がない場合はnullを返します。
成功した場合、その後の要素の処理は抑制され、検索関数の他の並列呼出しの結果は無視されます。
型パラメータ:
U – 検索関数の戻り値の型パラメータ:
parallelismThreshold – このオペレーションを並列的に実行するために必要な(推定の)要素数
searchFunction – 成功した場合はnull以外の結果を返し、それ以外の場合はnullを返す関数。戻り値:
各値に指定された検索関数を適用したnull以外の結果。存在しない場合はnull
public <U> U searchEntries(long parallelismThreshold,
Function<Map.Entry<K,V>,? extends U> searchFunction)各エントリに指定された検索関数を適用したnull以外の結果を返します。結果がない場合はnullを返します。
成功した場合、その後の要素の処理は抑制され、検索関数の他の並列呼出しの結果は無視されます。
型パラメータ:
U – 検索関数の戻り値の型パラメータ:
parallelismThreshold – このオペレーションを並列的に実行するために必要な(推定の)要素数
searchFunction – 成功した場合はnull以外の結果を返し、それ以外の場合はnullを返す関数。戻り値:
各エントリに指定された検索関数を適用したnull以外の結果。存在しない場合はnull
API ドキュメントを見るとそれぞれ key と value に対する操作、key に対する操作、value に対する操作、Map.Entry に対する操作を行うように分けられているようです。
search でも Function, BiFunction が便利に検索関数として使われています。
それでは順番にプログラムの処理を見ていきましょう。
key と value による検索
String searchResult = map.search(1, (key, value) -> key.length() > 7 && value.length() > 7 ? key + ” = ” + value : null);
key が 7 文字より長くかつ value が 7 文字より長い値をフィルタリングし、key, value を一つの文字列として返しています。(解りやすいように連結用の文字を追加しています。)
プログラムの実行結果は次のようになりました。
(○・ω・)ノ————- search ————-
searchResult: J2SE 1.3.1 = Ladybird
API ドキュメントによれば、複数検索条件に該当する場合、最初に検索に成功した一つだけが選択され、他は破棄されます。
よって検索条件に該当するものが複数ある場合必ず結果が同じになるとは限らない。
key による検索
String searchKeysResult = map.searchKeys(1, key -> key.length() > 7 ? key : null);
key が 7 文字より長い値をフィルタリングし、key を返しています。
プログラムの実行結果は次のようになりました。
(○・ω・)ノ————- searchKeys ————-
searchKeysResult: J2SE 1.2.2
value による検索
String searchValuesResult = map.searchValues(1, value -> value.length() > 7 ? value : null);
value が 7 文字より長い値をフィルタリングし、value を返しています。
プログラムの実行結果は次のようになりました。
(○・ω・)ノ————- searchValues ————-
searchValuesResult: Sparkler
Map.Entry による検索
String searchEntriesResult = map.searchEntries(1,
entry -> entry.getKey().length() > 7 && entry.getValue().length() > 7 ? entry.getKey() + ” = ” + entry.getValue() : null);
Entry の key と value をそれぞれ取得、そして key が 7 文字より長くかつ value が 7 文字より長い値をフィルタリングし、key, value を一つの文字列として返しています。(解りやすいように連結用の文字を追加しています。)
プログラムの実行結果は次のようになりました。
(○・ω・)ノ————- searchEntries ————-
searchEntriesResult: JDK 1.1.4 = Sparkler
reduce
それでは reduce を見てみましょう。
80 行目から 114 行目まで reduce を試しています。
reduce も search 同様に key と value, key, value, Map.Entry それぞれに操作を行うメソッドを用意しています。
API ドキュメントを見てみましょう。
public <U> U reduce(long parallelismThreshold,
BiFunction<? super K,? super V,? extends U> transformer,
BiFunction<? super U,? super U,? extends U> reducer)指定されたリデューサを使用して値を結合することにより、すべての(キー、値)ペアの指定された変換の累積結果を返します。結果がない場合はnullを返します。
型パラメータ:
U – トランスフォーマの戻り値の型パラメータ:
parallelismThreshold – このオペレーションを並列的に実行するために必要な(推定の)要素数
transformer – 要素の変換を返す関数。変換がない場合はnull(その場合、値は結合されない)
reducer – 交換可能性と結合性を持つ結合関数戻り値:
すべての(キー、値)ペアの指定された変換を累積した結果public <U> U reduceKeys(long parallelismThreshold,
Function<? super K,? extends U> transformer,
BiFunction<? super U,? super U,? extends U> reducer)指定されたリデューサを使用して値を結合することにより、すべてのキーの指定された変換の累積結果を返します。結果がない場合はnullを返します。
型パラメータ:
U – トランスフォーマの戻り値の型パラメータ:
parallelismThreshold – このオペレーションを並列的に実行するために必要な(推定の)要素数
transformer – 要素の変換を返す関数。変換がない場合はnull(その場合、値は結合されない)
reducer – 交換可能性と結合性を持つ結合関数戻り値:
すべてのキーの指定された変換を累積した結果public <U> U reduceValues(long parallelismThreshold,
Function<? super V,? extends U> transformer,
BiFunction<? super U,? super U,? extends U> reducer)指定されたリデューサを使用して値を結合することにより、すべての値の指定された変換の累積結果を返します。結果がない場合はnullを返します。
型パラメータ:
U – トランスフォーマの戻り値の型パラメータ:
parallelismThreshold – このオペレーションを並列的に実行するために必要な(推定の)要素数
transformer – 要素の変換を返す関数。変換がない場合はnull(その場合、値は結合されない)
reducer – 交換可能性と結合性を持つ結合関数戻り値:
すべての値の指定された変換を累積した結果public <U> U reduceEntries(long parallelismThreshold,
Function<Map.Entry<K,V>,? extends U> transformer,
BiFunction<? super U,? super U,? extends U> reducer)指定されたリデューサを使用して値を結合することにより、すべてのエントリの指定された変換の累積結果を返します。結果がない場合はnullを返します。
型パラメータ:
U – トランスフォーマの戻り値の型パラメータ:
parallelismThreshold – このオペレーションを並列的に実行するために必要な(推定の)要素数
transformer – 要素の変換を返す関数。変換がない場合はnull(その場合、値は結合されない)
reducer – 交換可能性と結合性を持つ結合関数戻り値:
すべてのエントリの指定された変換を累積した結果
reduce は、int, long, double などを便利に使えるようにする reduce○△×□ToInt, reduce○△×□ToLong, reduce○△×□ToDouble なども用意されています。
これらは、入力をプリミティブ型に変換して基準値を指定して累積計算して値を返すというものです。
今回のプログラムでは、reduceValuesToInt だけ試しています。
API ドキュメントでは次のようになっています。
public int reduceValuesToInt(long parallelismThreshold,
ToIntFunction<? super V> transformer,
int basis,
IntBinaryOperator reducer)指定されたリデューサを使用して値を結合し、指定された基準を識別値として使用して、すべての値の指定された変換の累積結果を返します。
パラメータ:
parallelismThreshold – このオペレーションを並列的に実行するために必要な(推定の)要素数
transformer – 要素の変換を返す関数basis – リダクションの識別(初期のデフォルト値)
reducer – 交換可能性と結合性を持つ結合関数戻り値:すべての値の指定された変換を累積した結果
それではプログラムを見ていきましょう。
key と value によるリデュース
String reduceResult = map.reduce(1,
(key, value) -> key + ” = ” + value,
(s1, s2) -> s1 + System.getProperty(“line.separator”) + s2);
第二引数の BiFunction で要素の変換処理(key, value を一つの文字列として返しています。(解りやすいように連結用の文字を追加しています。)をして返し、
第三引数の BiFunction で返された値をシステムの改行文字を加えて結合して返します。
このプログラムの実行結果は次のようになります。
(○・ω・)ノ————- reduce ————-
reduceResult
JDK 1.1.7 = Brutus
Java SE 9 =
JDK 1.1.8 = Chelsea
Java SE 8 = null
JDK 1.1.5 = Pumpkin
JDK 1.1.6 = Abigail
J2SE 1.2 = Playground
JDK 1.1.4 = Sparkler
Java SE 7 = Dolphin
Java SE 6 = Mustang
J2SE 1.2.2 = Cricket
J2SE 1.3.1 = Ladybird
J2SE 1.2.1 = null
J2SE 1.3 = Kestrel
J2SE 1.4 = Merlin
J2SE 5.0 = Tiger
J2SE 1.4.2 = Mantis
J2SE 1.4.1 = Hopper
key によるリデュース
String reduceKeysResult = map.reduceKeys(1,
key -> key.length() > 9 ? key : null,
(s1, s2) -> s1 + System.getProperty(“line.separator”) + s2);
第二引数の Function で要素の変換処理 key が 9 文字より長い値をフィルタリングし、key を返し、
第三引数の BiFunction で返された値をシステムの改行文字を加えて結合して返します。
このプログラムの実行結果は次のようになります。
(○・ω・)ノ————- reduceKeys ————-
reduceKeysResult
J2SE 1.2.2
J2SE 1.3.1
J2SE 1.2.1
J2SE 1.4.2
J2SE 1.4.1
value によるリデュース
String reduceValuesResult = map.reduceValues(1,
value -> value.length() > 4 && value.length() < 7 ? value : null,
(s1, s2) -> s1 + System.getProperty(“line.separator”) + s2);
第二引数の Function で要素の変換処理 value が 4 文字より長く、かつ value が 7 文字より短い値をフィルタリングし、value を返し、
第三引数の BiFunction で返された値をシステムの改行文字を加えて結合して返します。
このプログラムの実行結果は次のようになります。
(○・ω・)ノ————- reduceValues ————-
reduceValuesResult
Brutus
Merlin
Tiger
Mantis
Hopper
Map.Entry によるリデュース
String reduceEntriesResult = map.reduceEntries(1,
entry -> entry.getKey().length() > 7 && entry.getValue().length() > 7 ? entry.getKey() + ” = ” + entry.getValue() : null,
(s1, s2) -> s1 + System.getProperty(“line.separator”) + s2);
第二引数の Function で要素の変換処理 Map.Entry の key と value をそれぞれ取得、そして key が 7 文字より長くかつ value が 7 文字より長い値をフィルタリングし、
Map.Entry の key, value を一つの文字列として返しています。(解りやすいように連結用の文字を追加しています。)
第三引数の BiFunction で返された値をシステムの改行文字を加えて結合して返します。
このプログラムの実行結果は次のようになります。
(○・ω・)ノ————- reduceEntries ————-
reduceEntriesResult
J2SE 1.2 = Playground
JDK 1.1.4 = Sparkler
J2SE 1.3.1 = Ladybird
reduceValuesToInt
int reduceMaxValuesResult = map.reduceValuesToInt(1,
String::length,
0,
Integer::max);
第二引数の ToIntFunction で value の文字列の長さを返し(int 値)、
第三引数の基準値を元に、
第四引数の IntBinaryOperator で最大値を求める累積処理を行い最大値を返します。
このプログラムの実行結果は次のようになります。
(○・ω・)ノ————- reduceMaxValuesResult ————-
10
これで、ざっくり試してみたのでお終いです。
(○・ω・)ノ————- お終い! ————-
今回は試さなかったけど次のような優れものが Java SE 8 から追加されてます。
public static <K> ConcurrentHashMap.KeySetView<K,Boolean> newKeySet()
指定された型からBoolean.TRUEへの、ConcurrentHashMapに連動する新しいSetを作成します。
型パラメータ:
K – 返されるセットの要素型戻り値:
新しいセットpublic static <K> ConcurrentHashMap.KeySetView<K,Boolean> newKeySet(int initialCapacity)
指定された型からBoolean.TRUEへの、ConcurrentHashMapに連動する新しいSetを作成します。
型パラメータ:
K – 返されるセットの要素型パラメータ:
initialCapacity – 実装は、この多数の要素を格納するように内部のサイズ設定を実行する。戻り値:
新しいセット例外:
IllegalArgumentException – 要素の初期容量が負の場合
Java SE 8 から Map が強力になった。
特に ConcurrentHashMap は、J2SE5.0 の時と比べるとかなり使い勝手がよくなった。(^_^)
さて、ここでもう一度 ConcurrentHashMap の特徴を思い出してみましょう。
ConcurrentHashMap は、すべてのメソッドを共通のロックで同期化してアクセスを一度に一つのスレッドに限定する代わりに、
ロックストライピングという小さな粒度のロック方式を採用することにより並行処理能力を高めています。
ConcurrentHashMap は、Iterator および Enumeration は、ある時点または反復子/列挙の作成以降のハッシュテーブルの状態を反映する要素を返すように設計されているため、
ConcurrentModificationException をスローすることはありません。
なにより興味深いのはデータの取得時にはブロックすることがなく、更新についてはユーザーが並行レベルを設定できるということです。
ちょっと最後の並行レベルを設定できると言うところを試してみました。
ConcurrentHashMap インスタンスを生成するコンストラクタは 5 種類あります。
その中で並行レベルを設定できるのは次の引数を持つコンストラクタです。
public ConcurrentHashMap(int initialCapacity,
float loadFactor,
int concurrencyLevel)指定された要素数(initialCapacity)、テーブル密度(loadFactor)および並行更新数のしきい値(concurrencyLevel)に基づく初期テーブル・サイズで、新しい空のマップを作成します。
パラメータ:
initialCapacity – 初期容量。負荷係数を指定すると、実装はこの数の要素を格納できるように内部のサイズ設定を行う。
loadFactor – 初期テーブル・サイズを設定するための負荷係数(テーブル密度)
concurrencyLevel – 並行して更新しているスレッドの推定数。実装はこの値をサイズ設定のヒントとして使用できる。例外:
IllegalArgumentException – 初期容量が負であるか、負荷係数またはconcurrencyLevelが正でない場合
第三引数の int concurrencyLevel が並行レベルにあたります。
Java SE 8 の API ドキュメントでは初期容量、負荷係数、並行レベルのデフォルト値が明記されなくなりました。(初期容量は 16 とありました)
ソースを覗いてみると初期容量 16, 負荷係数 0.75f, 並行レベル 16 となっています。昔と変わって無さそうです。
並行レベルの設定で大きな違いがでるのかつぎのような ConcurrentHashMap インスタンスを生成して試してみました。
ConcurrentHashMap<Long, Long> map = new ConcurrentHashMap<>();
ConcurrentHashMap<Long, Long> map2 = new ConcurrentHashMap<>(16, 0.75f, 1);
下記 udDate() メソッドを同時に実行するスレッドを 32 個作って試してみました。
CompletableFuture<Void> cnt_1 = CompletableFuture.runAsync(() -> upDate.upDate()); こんなのを 32 個 (^_^;)
void upDate() {
for (long i = 0; i < 100_000_000; i++) {
map.computeIfAbsent(i, (k) -> k);
if ((i & 0b1) == 0) {
map.remove(i);
}
}
for (long i = 0; i < 100_000_000; i++) {
map.computeIfAbsent(i, (k) -> k * 10);
map.getOrDefault(i, – 1L);
}
}
プログラムの実行結果は次のようになりました。
ConcurrentHashMap<Long, Long> map = new ConcurrentHashMap<>(); // 5分33秒14428361700004189
ConcurrentHashMap<Long, Long> map2 = new ConcurrentHashMap<>(16, 0.75f, 1); // 34分42秒4334219810002651
凄く大差がつきました。
ちなみに実行環境は、先ほどのプログラムを実行しているコンピュータで論理 CPU 数 32 個、VM オプションの ForkJoinPool の制限無しで試してみました。
以上、ConcurrentHashMap でのお遊びはこれでお終いです。
TAGS: Java | 2015年6月13日10:45 PM | Comment : 1