|
|||||||||||||||||||||||
HOME | COURSES | TALKS | ARTICLES | GENERICS | LAMBDAS | IOSTREAMS | ABOUT | CONTACT | | | | |||||||||||||||||||||||
|
Java 7 - Thread-Synchronisation mit Hilfe des Phasers
|
||||||||||||||||||||||
Im Rahmen unserer Artikelreihe über die Neuerungen in Java 7 wollen wir dieses Mal den Phaser ansehen. Dabei geht es nicht um jene galaktische Strahlenkanone, mit der sich die Star-Trek-Helden verteidigt haben, sondern es gibt einen Synchronizer im JDK-Package java.util.concurrent , der so heißt, weil er in Phasen abläuft. Er hat ähnliche Eigenschaften wie die bereits seit Längerem existierenden Synchronisationsmittel CountDownLatch und CyclicBarrier , ist aber deutlich flexibler. Der Phaser ist ebenso wie das CountDownLatch und die CyclicBarrier eine der Abstraktionen aus dem JDK, die die Synchronisation von Threads unterstützen. Allen drei Abstraktionen ist gemeinsam, dass sie ein sogenanntes Thread-Rendezvous implementieren. Das Thread-Rendezvous-Pattern beschreibt das Treffen von zwei oder mehr Threads, die zusammen kommen, um sich zu synchronisieren, Daten auszutauschen oder ihre Aktionen miteinander zu koordinieren. Dabei unterscheidet man zwischen waiting , timed , und balking Rendezvous, je nachdem ob die am Treffpunkt ankommenden Threads beliebig lange aufeinander warten (waiting) oder nach einer gewissen Wartezeit den Treffpunkt verlassen (timed) oder sofort weggehen, wenn der oder die anderen Threads nicht schon da sind (balking). Ein Beispiel für ein wartendes Rendezvous wäre das Treffen mehrerer Worker-Threads, die darauf warten, dass ein anderer Thread alle erforderlichen Vorbereitungen erledigt und dann zum Treffpunkt kommt, ehe die Worker-Threads mit ihrer Arbeit beginnen; die Ankunft des vorbereitenden Threads ist dann so eine Art Start-Signal.
Hinter allen Rendezvous-Implementierungen steckt in der Regel ein Zähler
(z.B. für die schon angekommenen oder die noch am Treffpunkt erwarteten
Threads), der rauf oder runter gezählt wird, bis er einen Schwellenwert
erreicht. Das Erreichen des Schwellenwerts löst ein Ereignis
aus, auf das man warten oder abfragen kann.
Die tradionellen Synchronizer CyclicBarrier und CountDownLatchBeim CountDownLatch ist die Verwendung eines Zählers am offensichtlichsten: ein CountDownLatch wird konstruiert, indem man einen Anfangswert N > 0 angibt. Mit der Methode countDown() wird bei jedem Aufruf der Zähler dekrementiert. Mit der Methode await() kann man darauf warten, dass der Zähler endlich den Wert 0 erreicht. Diesen einfachen Count-Down-Mechanismus kann man sehr bequem für Start- und Ende-Signale verwenden. Wenn zum Beispiel ein Thread Vorbereitungen machen soll, ehe n andere Threads mit ihrer Arbeit beginnen können, dann kann man mit einem CountDownLatch(1) ein Startsignal geben. Die n Worker-Threads würden am CountDownLatch warten, bis der Zähler runtergezählt wird, ehe sie mit ihrer eigentlichen Arbeit beginnen. Der vorbereitende Thread würde nach Beendigung der Vorbereitungen den CountDown machen und damit die wartenen Worker-Threads aufwecken. Ganz ähnlich kann man ein Ende-Signal geben. Dazu würde man ein CountDownLatch(n) verwenden, wobei n die Zahl der Worker-Threads ist, auf deren Ergebnis man warten will. Jeder Worker-Thread würde am Ende nach getaner Arbeit einen CountDown machen. Wenn der letzte Worker-Thread fertig wird, passiert der CountDown auf 0 und dann wacht der nachverarbeitende Thread auf, der am CountDownLatch gewartet hat.Die CyclicBarrier ist ähnlich. Sie ist für das Rendezvous mehrerer Threads konzipiert, die sich treffen und aufeinander warten wollen, bis alle Parteien am Treffpunkt angekommen sind. Eine CyclicBarrier wird mit der Anzahl der beteiligten Parteien konstruiert. Auch hier wird ein Zähler runter gezählt: es wird gezählt, wie viele Parteien noch ankommen müssen, ehe alle da sind. Wenn alle am Treffpunkt angekommen sind, d.h. wenn der Zähler der noch fehlenden Parteien den Wert 0 erreicht, dann wachen alle Threads auf, die am Treffpunkt gewartet haben. Beides, das Ankommen und das Warten, ist in der Methode await() kombiniert. Auch mit einer CyclicBarrier kann man Start- und Ende-Signale geben. Für ein Start-Signal würde man zum Beispiel eine CyclicBarrier(n+1) verwenden, wobei n die Zahl der Worker-Threads ist. Die n Worker-Threads würden wieder am Anfang einen await() an der Barriere machen. Dann sind bereits n der n+1 Parteien angekommen und warten darauf, dass der fehlende Thread ankommt. Der „fehlende“ Thread ist der vorbereitende Thread, der erst an der Barriere ankommt, wenn er seine Vorbereitungen erledigt hat. Sein Aufruf von await() ist dann das Start-Signal für die wartenden Worker-Threads. Der wesentliche Unterschied zwischen der CyclicBarrier und dem CountDownLatch ist die Wiederverwendbarkeit. Bei der CyclicBarrier kann man das Treffen beliebig oft „cyclic“ (= zyklisch) wiederholen. Ein CountDownLatch hingegen kann man nur einmal verwenden, wie der Namensbestandteil „latch“ (= Riegel, Schnappverschluss) andeutet: es erreicht nur einmal seinen Endzustand und rastet dann ein. Um im obigen Beispiel ein Ende-Signal von den Worker-Threads an einen nachbereitenden Thread zu schicken, könnte man dieselbe CyclicBarrier verwenden, die auch bereits für das Start-Signal benutzt wurde. In der Lösung mit dem CountDownLatch benötigt man hingegen wegen der fehlenden Wiederverwendbarkeit zwei verschiedene Latches.
CyclicBarrier
und
CountDownLatch
haben auch Gemeinsamkeiten,
insbesondere im Unterschied zum
Phaser
. Gemeinsam ist ihr Mangel
an Flexibilität, denn in beiden Fällen muss die Zahl der beteiligten
Parteien bzw. der Anfangswert des Zählers bei der Konstruktion schon
bekannt sein und kann auch später nicht mehr dynamisch geändert
werden. Probleme, bei denen sich die Zahl der beteiligten Parteien
dynamisch verändert, lassen sich weder mit der
CyclicBarrier
noch mit dem
CountDownLatch
lösen. Eine solche Problemstellung
wäre zum Beispiel das Durchsuchen von Baumstrukturen. Wenn man
für jeden Zweig im Baum einen neuen Worker-Thread losschicken will,
dann ergibt sich die Zahl der benötigten Worker-Threads erst im Laufe
der Navigation. Zum Konstruktionszeitpunkt von
CyclicBarrier
oder
CountDownLatch
weiß man also die Zahl der beteiligten
Parteien noch nicht. Der
Phaser
hat dieses Defizit nicht:
beim
Phaser
können jederzeit weitere Threads registriert
werden, so dass sich das „Baum“-Problem elegant lösen lässt.
Ansonsten ähnelt der
Phaser
stärker der
CyclicBarrier
als dem
CountDownLatch
, weil er ebenfalls mehrfach verwendet werden
kann. Was man bei der
CyclicBarrier
als Zyklus bezeichnet,
entspricht der Phase beim
Phaser
.
Der neue Phaser-SynchronizerSehen wir uns den Phaser genauer an. Intern besteht der Phaser aus fünf Informationen:
Die beteiligten Parteien können bereits bei der Konstruktion registriert werden. Es können aber auch nach der Konstruktion noch Parteien über die Methoden register() und bulkRegister(int parties) hinzugefügt werden. Das ist anders als bei der CyclicBarrier , bei der die Zahl der beteiligten Parteien bei der Konstruktion festgelegt wird und nicht mehr geändert werden kann. Die Ankunft einer Partei am Treffpunkt wird durch die Methode arrive() ausgelöst. Die arrive() -Methode kehrt sofort zurück, d.h. die eingetroffene Partei sagt, dass sie am Treffpunkt war, und macht sofort weiter; sie wartet nicht auf das Eintreffen der anderen Parteien. Intern wird beim arrive() lediglich die Zahl der eingetroffenen Parteien inkrementiert bzw. die der noch erwarteten Parteien dekrementiert. Wenn man warten will, bis alle da sind, dann kann man die Methode arriveAndAwaitAdvance() verwenden. Sie entspricht der await() -Methode der CyclicBarrier . Wenn alle erwarteten Parteien am Treffpunkt angekommen sind, d.h. wenn die Zahl der eingetroffenen Parteien gleich der Zahl der registrierten Parteien ist (und damit die Zahl der noch erwarteten Parteien gleich Null), dann passiert der sogenannte Advance . Dabei rückt der Phaser in die nächste Phase vor: die Phasen-Nummer wird inkrementiert und die Zahl der eingetroffenen und noch nicht eingetroffenen Parteien werden zurückgesetzt. Das ist ähnlich wie bei der CyclicBarrier , mit dem Unterschied, dass die CyclicBarrier ihre Zyklen nicht numeriert. Die Phasennummer kann in der Methode awaitAdvance(int phase) verwendet werden, wenn auf das Ende einer bestimmten Phase gewartet werden soll. Registrierte Parteien können sich vom Phaser verabschieden, indem sie über die Methode arriveAndDeregister() mitteilen, dass sie in dieser Phase angekommen sind, aber an den nachfolgenden Phasen nicht mehr teilnehmen wollen. Intern wird die Zahl der registrierten Parteien dekrementiert. Die Zahl der noch erwarteten Parteien wird ebenfalls dekrementiert, weil die Partei angekommen ist, also nicht mehr erwartet wird. Die Zahl der angekommenen Parteien bleibt wegen der gleichzeitigen Deregistrierung unverändert, denn die Summe der eingetroffenen und noch erwarteten Parteien muss immer gleich der Zahl der registrierten Parteien sein.
Wenn sich alle registrierten Parteien verabschiedet haben, d.h. die
Zahl der registrierten Parteien auf Null sinkt, dann geht der
Phaser
in seinen Terminierungszustand. In diesem Zustand ist die Phasennummer
kleiner Null, also ungültig. Alle wartenden Methoden kehren bei Erreichen
des Terminierungszustands zurück, weil die letzte Phase vorbei ist.
Eine einfache Benutzung des PhasersSehen wir uns nur einige Anwendungsbeispiele an. Beginnen wir mit dem oben schon beschriebenen Start-Signal für eine Gruppe von Worker-Threads.Unter Verwendung einer CyclicBarrier sieht es so aus: public void split(List<Runnable> tasks, Runnable setup) { final CyclicBarrier barrier = new CyclicBarrier( tasks.size()+1 ); // create and start all worker threads
for (final Runnable task
: tasks) {
// perform setup and send start signal
setup.run();
Unter Verwendung eines Phaser s sieht es ganz ähnlich aus:
public void split(List<Runnable> tasks, Runnable
setup) {
// create and start all worker threads
for (final Runnable task
: tasks) {
// perform setup and send start signal
setup.run();
Man kann es mit dem Phaser s auch ein wenig anders machen, was in diesem simplen Beispiel mit nur einer einzigen Phase und einer fixen Zahl von Beteiligten nicht nötig wäre. Also, nur zur Demonstration der Phaser -spezifischen Methoden:
public void split(List<Runnable> tasks, Runnable
setup) {
phaser.register() ; // register preparing thread // create and start all worker threads
for (final Runnable task
: tasks) {
// perform setup and send start signal
setup.run();
Wir haben dieses Mal das Start-Signal nicht mit arriveAndAwaitAdvance() gegeben, sondern mit arrive() . Das bedeutet, dass der vorbereitende Thread nicht auf das Ende der Phase wartet. Das ist auch gar nicht nötig, weil der vorbereitende Thread ohnehin der letzte ankommende Thread ist und damit derjenige, der die Phase beendet.
Außerdem haben wir die Zahl der Parteien nicht vorher berechnet
und bei der Konstruktion des
Phaser
s angegeben, sondern wir haben
erst einen leeren
Phaser
konstruiert und die beteiligten Parteien
später nach und nach registriert. Der Effekt ist der derselbe,
aber man kann gut sehen, wie man beim
Phaser
eine vorher nicht
bekannte Anzahl von Parteien registrieren würde. Außerdem
ist diese Form der Registrierung vom Programmierstil her besser, weil sie
das Prinzip der Ownership respektiert.
Eine flexiblere Benutzung des PhasersSpannender als das obige Beispiel sind Situationen, in denen die Flexibilität des Phaser s bei der Registrierung der Parteien auch tatsächlich gebraucht wird. Betrachten wir das eingangs schon erwähnte „Baum“-Beispiel. Nehmen wir einmal an, wir wollen alle Dateien in einem Verzeichnis und allen seinen Unterverzeichnissen zählen. Dabei wollen wir für jedes Unterverzeichnis eine eigene Task erzeugen, die dann wiederum eigene Subtasks erzeugt für ihre jeweiligen Unterverzeichnisse. Die Zahl der benötigten Tasks ergibt sich also erst im Laufe der Navigation durch den Verzeichnisbaum und steht nicht von Vornherein fest. Die Tasks sollen einem Thread-Pool zur Ausführung übergeben werden. Wenn wir einen ThreadPoolExecutor verwenden, dann stehen wir vor dem Problem, dass wir den Pool nach Abarbeitung aller Tasks herunterfahren müssen. Wir brauchen also ein Shutdown-Signal, das uns sagt, wann die Zerlegung in Tasks erfolgt ist, alle Tasks an den Thread-Pool übergeben wurden und alle Task fertig geworden sind. Dafür kann man einen Phaser verwenden.public class DirectoryTraverser {In diesem Beispiel haben wir einen leeren Phaser erzeugt, an dem sich jede Task in ihrem Konstruktor registriert und nach Beendigung ihrer Arbeit, d.h. am Ende von run() wieder deregistriert. Da hier ausschließlich mit der Methode arriveAndDeregister() gearbeitet wird, bleibt die Zahl der bereits eingetroffenen Parteien stets gleich Null, weil die Parteien ankommen und sofort wieder weggehen. Damit eine Phase endet, muss die Zahl der angekommenen Parteien gleich der Zahl der registrierten Parteien sein. Die erste und einzige Phase endet also dann, wenn die Zahl der registrierten Parteien auf Null (gleich Zahl der bereits eingetroffenen Parteien) sinkt. Da sich die Tasks rekursiv in Subtasks zerlegen, wird die Zahl der registrierten Tasks erst den Wert Null erreichen, wenn die letzte Task endet, die sich nicht mehr in Subtasks zerlegt hat. Das heißt, wir brauchen im main -Thread nur auf das Ende der ersten Phase zu warten; dann wissen wir, dass alle Tasks fertig sind und wir den Thread-Pool herunterfahren können. Die Terminierung des Phasers und die onAdvance() -MethodeEingangs hatten wir gesagt, dass der Phaser in einen Terminierungszustand geht, wenn sich alle Parteien deregistriert haben. Das ist aber lediglich das Default-Verhalten. Man kann dieses Verhalten ändern, indem die protected Methode onAdvance() der Phaser -Klasse überschrieben wird.Diese onAdvance() -Methode ähnelt der Barrier Action bei der CyclicBarrier : sie wird aufgerufen, wenn die Phase endet, und zwar von dem Thread, der als letzter am Treffpunkt ankommt und das Phasen-Ende auslöst. Man kann die onAdvance() -Methode verwenden, um damit Nachverarbeitungen zu erledigen. Ihr Returnwert hat darüber hinaus eine besondere Bedeutung: er steuert den Übergang in den Terminierungszustand. Solange die onAdvance() -Methode false zurückliefert, gibt es weitere Phasen. Wenn die onAdvance() -Methode true liefert, dann bedeutet es: „Dies war die letzte Phase.“ Dann geht der Phaser in den Terminierungszustand. Die Default-Implementierung der onAdvance() -Methode sieht so aus:
protected boolean onAdvance(int phase,
int registeredParties) {
Man kann die Terminierungsbedingung also nach Belieben anpassen. Zum Beispiel könnte man dieses Mittel verwenden, um ein Reihe von Tasks mehrfach zu wiederholen. Das könnte so aussehen: void repeatTasks(List<Runnable> tasks, final int repetitions, boolean await_termination) {Es wird pro Task je ein Worker-Thread erzeugt, der sich registriert und anschließend gestartet wird. Jeder Thread führt seine Task aus und wartet anschließend am Phaser auf das Eintreffen der anderen Worker-Threads. Das Ganze wiederholt jeder Worker-Thread solange, bis der Phaser terminiert. Bei jeder Wiederholung erhöht sich die Phasen-Nummer. Die Terminierung des Phaser s wird durch die überschriebene onAdvance() -Methode gesteuert. Sie liefert true zurück, wenn die Phasen-Nummer die Zahl der Wiederholungen erreicht hat. Wenn der main -Thread warten soll, bis die Worker-Threads mit ihren Wiederholungen fertig sind, dann muss auch er auf die Terminierung des Phaser s warten. Das kann er tun, indem er sich registriert und mit den Worker-Threads auf die Beendigung der einzelnen Phasen wartet, solange bis der Phaser terminiert. ZusammenfassungDer Phaser ist ein neuer Synchronizer, der mit Java 7 zum Package java.util.concurrent hinzugekommen ist. Er ähnelt der CyclicBarrier , da auch der Phaser das Rendezvous von mehreren Parteien (Threads oder Tasks) organisiert. Der Phaser ist in vieler Hinsicht deutlich flexibler als die CyclicBarrier :
Die gesamte Serie über Java 7:
|
|||||||||||||||||||||||
© Copyright 1995-2013 by Angelika Langer. All Rights Reserved. URL: < http://www.AngelikaLanger.com/Articles/EffectiveJava/63.Java7.Phaser/63.Java7.Phaser.html> last update: 24 Jan 2013 |