|
|||||||||||||||||||||||||||||||||||||
HOME | COURSES | TALKS | ARTICLES | GENERICS | LAMBDAS | IOSTREAMS | ABOUT | CONTACT | | | | |||||||||||||||||||||||||||||||||||||
|
Effective Java
|
||||||||||||||||||||||||||||||||||||
Diesmal wollen wir uns genauer ansehen, wie parallele Streams mit blockierender Funktionalität umgehen und welche Möglichkeiten man als Stream-Nutzer hat, sie dabei zu beeinflussen. Die Ausgangssituation
In der Software-Architektur gibt es das
grundsätzliche Prinzip, den Durchsatz bei blockierender Funktionalität
durch die Verwendung von mehreren parallelen Threads zu erhöhen. Typische
Beispiele für blockierende Funktionalität sind synchrone Netzwerk-I/O
über HTTP, synchrone Datenbankzugriffe über JDBC, usw. . Verwendet
man mehrere Threads, um diese blockierende Funktionalität parallel auszuführen,
so lässt sich im Allgemeinen der Durchsatz steigern. Um solche Lösungen
mit mehreren parallelen Threads zu implementieren, ist einiges an Design-
und Implementierungsaufwand erforderlich. Außerdem braucht man ausreichendes
Know-How im Bereich von Multi-Thread-Programming, z. B. zum Umgang mit
Thread-Pools, zur Ergebnis-Synchronisation, usw. .
Jetzt, da parallele Streams ein Teil der
JDKs sind, gibt es die Hoffung, dass sich solche Lösungen deutlich einfacher
implementieren lassen: Man übergibt einfach die blockierende Funktionalität
in einem Lambda-Ausdruck an eine geeignete Stream-Operation eines parallelen
Streams.
Leider sind parallele Streams nicht uneingeschränkt für eine solche Benutzung geeignet. Wo die Probleme einer solchen Lösung sind und welche zusätzlichen Möglichkeiten man als Stream-Nutzer hat, diese Probleme zu umgehen bzw. zu minimieren, wollen wir uns im folgenden Artikel im Detail ansehen. Ein Beispiel
Schauen wir uns zuerst ein konkretes Beispiel
für blockierende Funktionalität an. Wir wollen uns mit Hilfe eines
HTTP-Requests Informationen (Symbol, Aktienkurs, sowie Veränderung des
Kurses) zu dem Tickersymbol eines börsennotierten Unternehmens holen:
public static String getStockInfo(String s) {
Diese blockierende Funktionalität übergeben
wir in einem Lambda-Ausdruck an die
map()
Operation eines Streams. Das heißt, in der
map()
-Operation
wird das Tickersymbol (
String
) auf einen Ergebnis-
String
abgebildet, der Symbol, Aktienkurs, sowie Veränderung des Kurses enthält.
Hier ist die gesamte Stream-Verarbeitung:
private static String[]
stockSymbols
= {"GOOG",
"AAPL", "MSFT", "YHOO"};
}
Als Stream-Source verwenden wir ein Array
mit vier Tickersymbolen. Nach der bereits oben beschriebenen Abbildung
des Tickersymbols auf einen
String
mit
den weiteren Aktieninformationen geben wir diesen String zusammen mit dem
Namen des verantwortlichen Threads in einem
peek()
auf
System.out
aus. Danach bilden wir
den
String
mit der Aktieninformation
auf ein
StockData
Objekt ab.
StockData
ist eine Klasse mit drei Feldern:
private String corpSym; // Tickersymbol
sowie einem Konstruktor, der die drei Felder
setzt und je einer
Getter
-Methode für jedes Feld. Die Methode
createStockData()
macht also nichts anderes, als den
String
mit der Aktieninformation zu parsen und mit den dabei ermittelten Werten
den
StockData
-Konstruktor aufzurufen.
Den vollständigen Source-Code von
createStockData()
und
StockData
zeigen wir aus Platzgründen
nicht in diesem Artikel. Wer sich für den vollständigen Source-Code
interessiert, findet ihn unter /
SRC
/. Man kann den
Source-Code auch herunterladen, um das Beispiel selbst am eigenen Rechner
laufen zu lassen und mit den Effekten zu experimentieren.
Die anschließende
filter()
-Operation
wird benötigt, um
null
-Werte aus dem
Stream zu entfernen. Sie können entstanden sein, weil sowohl
getStockInfo()
als auch
createStockData()
im Fehlerfall
auf
null
abbilden. Als terminale Operation
wird am Ende in einem
reduce()
die Aktie
mit der größten Wertsteigerung ermittelt und ausgegeben.
Schauen wir uns nun an, wie die Ausgabe aussieht, wenn wir das Programm auf einem Dual-Core-Rechner ablaufen lassen. Zuerst mit einem sequentiellen Stream:
Abbildung 1: Ausgabe bei Ablauf mit einem
sequentiellen Stream
Jede Zeile, die im
peek()
ausgegeben wird, erscheint mit einer gewissen Verzögerung in der Ausgabe.
Das liegt an der Latenzzeit des blockierenden HTTP-Requests. Die letzte
peek()
-Ausgabe
erscheint zusammen mit dem Ergebnis in der Ausgabe. Die Unterbrechungen
in dem schwarzen Strich vorne vor der Ausgabe sollen genau dieses Anhalten
der Ausgabe nach jeder der ersten drei Zeilen illustrieren. In der Ausgabe
der
peek()
-Aufrufe können wir auch sehen,
dass alle HTTP-Request-Aufrufe vom
main
-Thread
ausgeführt worden sind.
Und wie sieht das Ganze nun aus, wenn wir den Code mit einem parallelen Stream auf unserem Dual-Core-Rechner ausführen? Dazu müssen wir im Code oben den parallel() -Aufruf nach dem Erzeugen des Stream reinkommentieren. Die Ausgabe ist dann:
Abbildung 2: Ausgabe bei Ablauf mit einem
parallelen Stream
Jetzt werden immer zwei
peek()
-Aufrufe
zusammen ausgegeben und beim zweiten Paar wird auch die Ausgabe des Ergebnisses
noch gleich mit angehängt. Es werden je zwei
peek()
-Aufrufe
zusammen ausgegeben, weil jetzt zwei Threads (
main
und
ForkJoinPool.commonPool-worker-1
)
parallel den Stream abarbeiten.
Weil wir einen Dual-Core-Rechner verwenden
haben, werden bei einem parallelen Stream default-mäßig zwei Threads
verwendet. Parallele Streams sind so implementiert, dass sie im Default-Fall
auf einer Multi-Core-Plattform immer so viele Threads verwenden, wie sie
Prozessor-Cores mit Hilfe von
java.lang.Runtime.availableProcessors()
auf der Plattform finden. Einer der Threads ist dabei der Thread, der
die Stream-Operation angestoßen hat. Hier in unserem Beispiel ist es der
main
-Thread.
Die anderen Threads werden vom
java.util.concurrent.ForkJoinPool.commonPool()
zur Verfügung gestellt. Der CommonPool ist ein Singleton-Thread-Pool,
der mit Java 8 in den JDK aufgenommen wurde. In unserem Fall wird vom CommonPool
nur ein Thread (
ForkJoinPool.commonPool-worker-1
)
zur Verfügung gestellt, weil wir eine Dual-Core-Plattform verwenden.
Mehr zu parallelen Streams und dem CommonPool findet sich in einem unserer
vorhergehenden Artikel (/
KLS1
/).
Wir haben jetzt also mit Hilfe des parallelen Streams die Software-Architektur implementiert, die wir oben bereits theoretisch beschrieben hatten. Der Durchsatz der HTTP-Requests hat sich dank des parallelen Streams im Vergleich zum sequentiellen Stream erhöht (etwa verdoppelt), und der Implementierungsaufwand dafür ist minimal (lediglich ein zusätzlicher Aufruf von parallel() ) Wenn wir eine Plattform mit noch mehr Prozessor-Cores verwenden würden, könnten wir den Durchsatz sogar noch weiter steigern, weil dann default-mäßig noch mehr Threads bei Verwendung eines parallelen Streams zur Verfügung stehen würden. Das Problem
Leider hat diese Lösung ein Problem, das
aber nicht ganz offensichtlich ist. Was passiert, wenn wir in unserer
JVM weitere parallele Streams nutzen, kurz nachdem wir die Methode
test01()
mit einem parallelen Stream aufgerufen haben? Die Funktionalität der
nachfolgend gestarteten parallelen Streams kann den gemeinsam genutzten
Thread des CommonPools nicht verwenden, denn dieser muss die blockierende
Funktionalität der HTTP-Requests aus
test01()
verarbeiten - wobei das "Verarbeiten" im Wesentlichen darin besteht,
dass der CommonPool-Thread in dem blockierenden HTTP-Request wartet.
Schauen wir uns dazu ein weiteres Beispiel an, das die Situation noch
ein wenig kontrollierter auf den Punkt bringt:
private static void test02() {
Runnable task = () -> { // Zeile 2 try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace();
}
IntStream.range(100, 104).parallel() .forEach(i -> System.out.println(Thread.currentThread().getName() + ": nonblocking - " + i));
};
new Thread(task,"myThread").start();
// Zeile 14
IntStream.range(0,4).parallel() // Zeile 16 .map(i -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return i; }) .forEach(i -> System.out.println(Thread.currentThread().getName() + ": blocking - " + i));
}
Die wartende Funktionalität besteht diesmal
in einem
sleep()
von einer Sekunde, das
beim Abbilden eines
int
-Wertes (0 …
3) auf sich selbst auftritt. Danach wird der
int
-Wert
ausgegeben. Das Ganze ist ab Zeile 16 ff. implementiert.
Parallel dazu läuft der Thread myThread . Er wartet 1,5 Sekunden, bevor er dann nicht-blockierend die int -Werte von 100 bis 103 ausgibt (implementiert in Zeile 14 bzw. Zeile 2 ff.). Zusätzlich werden bei allen Ausgaben die Thread-Namen mit ausgegeben. Wie sieht nun die Ausgabe aus?
Abbildung 3: Ausgabe bei Ablauf zweier
konkurrierender paralleler Streams
Nach einer Sekunde werden die 2 und die 1
aus der blockierenden Funktionalität durch den
main
-Thread
und den einen Thread des CommonPools ausgegeben. Nach 1,5 Sekunden werden
102, 103, 101, 100 von dem
myThread
ausgegeben.
Der Thread des CommonPools kann sich nicht an der Ausgabe beteiligen, da
er im
sleep()
der blockierenden Funktionalität
wartet. Man sieht an der Tatsache, dass die Reihenfolge der
int
-Werte
nicht eingehalten wird, dass die Abarbeitung der nicht-blockierenden Funktionalität
im Parallel-Modus erfolgt ist - aber mit nur einem Thread! Nach zwei
Sekunden werden die 3 und die 0 aus der blockierenden Funktionalität durch
den
main
-Thread und den einen Thread
des CommonPools ausgegeben.
Wir wollen das Ergebnis noch einmal kurz verallgemeinert zusammenfassen: Blockierende Funktionalität in Streams bindet die gemeinsam genutzten CommonPool-Threads für ihre Aufgaben, die im Wesentlichen aus Warten bestehen. Trotzdem werden nachfolgende parallele Stream-Aufrufe ausgeführt. Es ist nicht so, dass nachfolgende Stream-Aufrufe abwarten müssen, bis die blockierende Funktionalität fertig geworden ist. Allerdings werden nachfolgende parallele Stream-Aufrufe nicht parallel, sondern von nur von einem Thread ausgeführt, nämlich dem Thread, der die nachfolgende parallele Stream-Operation angestoßen hat. Lösungsansätze
Welche Möglichkeiten hat man nun als Stream-Nutzer,
in das Thread-Management der parallelen Streams einzugreifen? Grundsätzlich
erst einmal relativ wenig.
Das liegt daran, dass die Designmaxime bei
der Entwicklung der Streams war: Streams und insbesondere parallele Streams
sollen möglichst einfach zu benutzen sein. Konfigurierbarkeit stand
deshalb nicht im Vordergrund. Es kommt noch hinzu, dass umfangreiche
Konfigurierbarkeit leicht zu Fehlkonfigurationen führen kann. Dies wollte
man aber auf jeden Fall vermeiden, damit das neue Feature Streams kein
schlechtes Image bekommt. Entsprechend sind parallele Streams vorwiegend
für CPU-intensive Funktionalität und nicht für blockierende Funktionalität
gedacht. Wie gut parallele Streams zu CPU-intensiver Funktionalität
passen, haben wir übrigens bereits in einem vorhergehenden Artikel über
Stream-Performance diskutiert (/
KLS2
/).
Es gibt also nicht den einen Schalter, um
parallele Streams an blockierende Funktionalität anzupassen. Aber es
gibt einige Möglichkeiten einzugreifen. Teilweise handelt es sich dabei
richtiggehend um Hacks, die so nie vorgesehen waren.
Wir schauen uns die Möglichkeiten im Folgenden genauer an. Dabei haben wir es so gemacht, dass wir mit den weniger aufwändigen Möglichkeiten beginnen. Parallelität des CommonPool verändern
Wir haben schon oben erwähnt, dass im
Default-Fall für die Bearbeitung eines parallelen Streams auf einer Multi-Core-Plattform
immer so viele Threads verwenden, wie Prozessor-Cores auf der Plattform
vorhanden sind. Diese Threads sind:
Genau genommen ist die Anzahl der CommonPool-Threads
nicht fix. In Ruhephasen werden die Threads sukzessive beendet und unter
Last werden sie wieder gestartet. Auch gibt es Ausnahmesituationen, in
denen der CommonPool mehr als (Anzahl Cores – 1) viele Threads enthalten
kann. Wir schauen uns diese spezielle Situationen unten im Absatz "
ForkJoinPool.ManagedBlocker
nutzen" an.
Da die Anzahl der CommonPool-Threads nicht
fix ist, spricht die Javadoc des CommonPools bzw.
ForkJoinPool
s
nicht von der "Anzahl der Threads des Pools" sondern von der "Parallelität
des Pools" (Englisch:
parallelism
).
Der Default-Wert der Parallelität des CommonPools
ist wie bereits gesagt auf einer Mulit-Core-Plattform: Anzahl Cores –
1. Die Parallelität lässt sich mit Hilfe der System Property
java.util.concurrent.ForkJoinPool.common.parallelism
beim Start der JVM auf einen anderen Wert setzen.
Was geschieht nun, wenn wir in unserem parallelen
Tickersymbol-Beispiel (
test01()
) die Parallelität
des CommonPools auf fünf erhöhen, also die JVM mit dieser Option starten:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
Die entsprechende Ausgabe sieht dann so aus:
Abbildung 4: Ausgabe bei Ablauf mit CommonPool-Parallelität=5
Jetzt kommen alle Ausgaben zugleich heraus,
da sechs Thread für den parallelen Stream zur Verfügung stehen. In
der Ausgabe können wir sehen, dass nur vier Threads benötigt wurden,
weil unsere Streamsource nur vier Elemente enthält. Das heißt, zwei
Pool-Threads stehen trotz blockierender Funktionalität im CommonPool für
weitere parallele Streams zur Verfügung.
In unserem kleinen Beispiel war die Erhöhung
der Parallelität des CommonPools erfolgreich. Wir führen unsere blockierende
Funktionalität mit Hilfe eines parallelen Streams aus, ohne dass wir alle
Threads des CommonPools blockieren. Das liegt natürlich daran, dass
der Stream klein genug war. Wäre das Array mit den Tickersymbolen deutlich
größer, dann hätten wir alle Threads des CommonPools blockiert. Dieser
Lösungsansatz über die Erhöhung der Parallelität des CommonPools funktioniert
also nur für relativ kleine parallele Streams.
Die Erhöhung der Anzahl der CommonPool-Threads hat noch einen weiteren kleinen Nachteil. Durch die höhere Anzahl der Threads kommt es zu mermehrtem Thread-Scheduling, was ein wenig die Performance verringert. Das ist übrigens der Grund für die relativ geringe Default-Anzahl der Threads im CommonPool. Dedizierten ForkJoinPool nutzen
Bei Workshops und Kursen kommt bei diesem
Thema meist die Frage: Kann man nicht einfach einen anderen Thread-Pool
anstelle des CommonPools für die blockierende Funktionalität nutzen?
Das wäre doch eine ganz gute Lösung. Die CPU-intensive Funktionalität
nutzt weiter gemeinsam den CommonPool und für die blockierende Funktionalität
gibt es einen dedizierten neuen Pool. Auf diese Weise kommen sich die
beiden unterschiedlichen Typen von Funktionalität nicht in die Quere.
Von der Architektur ist es eine überzeugende und intuitive Lösung.
Leider gibt es offiziell (Javadoc, Stream-Design,
usw.) keine solche Möglichkeit. Aber es gibt einen Hack, der auch auf
einschlägigen Internetseiten (zum Beispiel hier: /
SOCP
/)
beschrieben wird. Schauen wir uns zuerst an, wie es geht und diskutieren
danach diesen Ansatz.
Die Idee ist, dass das Aufrufen der Stream-Funktionalität
als ein
Runnable
oder
Callable
an einen eigenen
ForkJoinPool
übergeben
wird. Mit anderen Worten, wir sorgen dafür, dass der Thread, der die
parallele Stream-Operation anstößt, bereits ein Fork-Join-Pool-Thread
ist. Das führt dazu, dass die Threads dieses ForkJoinPools dann die
Stream-Operationen ausführen und der CommonPool gar nicht verwendet wird.
Dieser Effekt ergibt sich wegen der internen, privaten Methode
ForkJoinTask.doInvoke()
.
Sie prüft, ob sie von einem
ForkJoinWorkerThread
ausgeführt wird.
Die Implementierung für unser Tickersymbol-Beispiel ist relativ einfach:
Runnable task = () -> Arrays.
stream
(
stockSymbols
).parallel()
ForkJoinPool myPool = new ForkJoinPool(4);
Auf dem parallelen Stream führen wir die
gleiche Funktionalität wie in der Methode
test01()
aus. Wir müssen diese Funktionalität jetzt aber als
Task
formulieren, die an den
myPool
mit
submit()
übergeben wird. Unseren
ForkJoinPool
statten wir mit einer Parallelität von vier aus (siehe Konstruktorparameter).
Der CommonPool hat dagegen nur eine Parallelität von eins auf unserer
Dual-Core-Umgebung. Weiter ist zu beachten, dass man sich anders als beim
CommonPool um das Herunterfahren des
myPool
kümmern muss. Wir haben das Herunterfahren im obigen Beispielcode aber
aus Gründen der Vereinfachung nicht gezeigt.
Die Ausgabe ist diesmal:
Abbildung 5: Ausgabe bei Ablauf mit dediziertem
ForkJoinPool
Alle Zeilen werden wieder auf einen Schlag
ausgegeben, weil wir unserer ForkJoinPool-Instanz eine Parallelität von
vier gegeben haben. Der Pool hat default-mäßig den Namen
ForkJoinPool-1
bekommen und wir sehen, dass seine vier Threads den Stream abarbeiten.
Auch wenn diese Lösung mit unserem Beispiel
funktioniert, hat dieser Ansatz das Problem, dass es ein Hack ist. Das
zeigt sich zum Beispiel daran, dass einige interne Dinge nicht passen.
So nutzt der parallele Stream (genauer gesagt die
AbstractTask
)
immer die Parallelität des CommonPools, um die Anzahl der Zerlegungen
der Fork-Join-Tasks zu berechnen. Dies geschieht auch, wenn der Stream
auf einem ganz anderen
ForkJoinPool
läuft,
wie in dem Beispiel oben. Die Zerlegung wird in unserem Beispiel mit
Parallelism=1 (aus dem CommonPool) berechnet, aber hinterher mit Parallelism=4
(aus dem eigenen ForkJoinPool) ausgeführt. Es passt nicht zusammen und
macht wenig Sinn, führt aber anderseits auch nicht gleich zu katastrophalen
Problemen. Man sollte einfach darauf vorbereitet sein, dass mit dem gezeigten
Hack nicht alles so läuft, wie normalerweise mit dem CommonPool.
Es ist auch zu beachten, dass dieser Hack
möglicherweise irgendwann nicht mehr funktionieren wird. Brian Goetz,
der Chefarchitekt für dieses Thema bei Oracle, hat auf der JAX-Konferenz
2014 in einer Fragestunde zu Thema Java 8 zu diesem Hack gesagt: „I don’t
promise that it will work that way for the rest of time.“ In dem Video
der Fragestunde (/
BGJ8
/) findet man weitere Hintergründe,
sowie seine Einordnung des Hacks (Minute 30:50 bis 33:50 im Video).
Ergänzend kann man zu dieser Lösung noch sagen, dass sie möglicherweise als eine Art Architekturstudie dienen kann, um danach die blockierende Funktionalität mit Hilfe einer eigenen ForkJoinTask –Implementierung (also ohne Streams) selbst zu implementieren. Im Gegensatz zur Stream-Lösung ist das dann natürlich deutlich aufwändiger und erfordert entsprechendes Fork-Join-Framework Know-How. Dafür hat man dann aber eine stabile, zukunftssichere Lösung. Wie so etwas grundsätzlich mit dem Fork-Join-Framework geht, haben wir in zwei früheren Artikeln bereits diskutiert (/ KLS3 /, / KLS4 /). ForkJoinPool.ManagedBlocker nutzen
Bei dem letzten Lösungsansatz, den wir
uns anschauen wollen, geht es wieder darum, die Anzahl der Threads im CommonPool
zu erhöhen. Diesmal wollen wir sie aber nicht permanent erhöhen.
Stattdessen wollen wir ein Feature des
ForkJoinPools
nutzen, mit dem der Pool die mit wartender Funktionalität blockierten
Threads vorübergehend - für die Dauer ihrer Wartezeit - durch neue Pool-Threads
kompensiert. Diese interessante Funktionalität des
ForkJoinPools
wird selten erwähnt, ist aber gerade in unserer Situation sehr nützlich.
Da der CommonPool eine spezielle Instanz des
ForkJoinPool
s
ist, stellt auch er diese Funktionalität zur Verfügung.
Der ForkJoinPool hat ein nested Interface ManagedBlocker . Die blockierende Funktionalität muss dieses Interface implementieren und dem Pool bekannt geben, so dass dieser das Kompensationsfeature für die entsprechende Funktionalität aktiviert. Wie die Implementierung im Detail aussieht, schauen wir uns weiter unten an. Zuerst wollen wir uns ansehen, wie die Ausgabe im Falle unseres Tickersymbol-Beispiels nun aussieht:
Abbildung 6: Ausgabe bei Ablauf mit
ManagedBlocker
Die Ausgabe sieht genauso aus wie zuvor in
Abbildung 4. Dort hatten wir selbst als Nutzer die Parallelität und
damit die Anzahl der Threads des CommonPools erhöht. Dieses Mal erhöht
der CommonPool von sich aus die Anzahl der Threads, da er wegen der Verwendung
des
ManagedBlockers
wartende Threads
durch neu gestartete Threads kompensiert.
Schauen wir uns auch noch an, wie die Ausgabe für das Beispiel in test02() aussieht, wenn man die blockierende Funktionalität mit Hilfe von ManagedBlocker ausdrückt:
Abbildung 7: Ausgabe bei Ablauf zweier
konkurrierender paralleler Streams (blockierend mit
ManagedBlocker
)
Nach einer Sekunde werden nicht nur zwei
der vier blockierenden Werte (2, 1, 0, 3) ausgegeben, sondern alle vier
auf einmal, weil der CommonPool jeden blockierenden Thread durch einen
neuen Thread kompensiert hat. Nach 1,5 Sekunden werden die nicht blockierenden
Werte (102, 101, 103, 100) ausgegeben. Daran ist neben dem
myThread
,
der die Ausführung des Streams angestoßen hat, ist nur ein Thread des
CommonPools (
worker-1
) beteiligt, weil die
nicht blockierende Funktionalität (anders als die blockierende Funktionalität)
vom CommonPool nicht kompensiert wird.
Schauen wir uns nun an, wie wir die Implementierung
der blockierenden Funktionalität aus dem Beispiel von
test02()
,
Zeile 16ff für die Benutzung geändert haben. Zuerst muss dazu das blockierende
Thread.sleep(1000)
in einen
ManagedBlocker
gelegt werden:
private static class IntBlocker implements ForkJoinPool.ManagedBlocker { private final int value;
private volatile boolean done;
public IntBlocker(int value) { this.value = value;
}
public boolean block() throws InterruptedException { if (done == false) { Thread.sleep(1000); done = true; } return true;
}
public boolean isReleasable() { return done;
}
public int getValue() { return value; }
}
Die Wrapper-Klasse
IntBlocker
können
wir nun in der
map
()
-Operation
nutzen:
IntStream.range(0,4).parallel() .map(i -> { IntBlocker blocker = new IntBlocker(i); try { ForkJoinPool.managedBlock(blocker); } // Zeile 4 catch (InterruptedException e) { } return blocker.getValue(); })
.forEach(i -> System.out.println(Thread.currentThread().getName()
+ ": blocking - " + i));
Wichtig ist dabei, dass wir die statische
Methode
managedBlock()
des
ForkJoinPools
aufrufen (Zeile 4); sie ist der Trigger für die Kompensationsfunktionalität
des Pools.
Von der Architektur her ist diese Lösung sicher der beste Ansatz: • Die Anzahl der Threads passt sich dynamisch abhängig von der Anzahl der blockierten Threads an. Dies ist besser, als die Parallelität des CommonPools fest zu verändern.
•
Es
wird weiter der CommonPool genutzt, auf den die Implementierung der parallelen
Streams aufsetzt.
Einen Nachteil dieser Lösung haben wir schon
gesehen: Die Implementierung ist im Vergleich zu den vorher diskutierten
Lösungsansätzen relativ aufwändig. Man muss die blockierende Funktionalität
in einen
ManagedBlocker
einpacken und
diese Wrapper-Klassen dann entsprechend nutzen.
Ein weiteres potentielles Problem dieses
Ansatzes liegt in der Kompensation an sich. Doug Lea hat das Problem
so beschrieben: „make a choice between […] either locking up vs blowing
up programs” (siehe /
DLMA
/). Gemeint ist damit
Folgendes: Um blockierende Threads (
locking up
) zu kompensieren,
werden neue Threads gestartet. Wenn diese aber auch wieder blockieren
und kompensiert werden müssen, werden noch mehr neue Threads gestartet.
Dies kann soweit gehen, dass das Programm an seine Ressourcegrenzen stößt
und abgebrochen wird (
blowing up
). Ressourcegrenzen können dabei
Dinge wie zum Beispiel die maximale Anzahl der Threads oder der maximale
Speicher sein. Entsprechend ist es auch nicht so, dass jeder blockierte
Thread immer sofort durch einen neuen Thread ersetzt wird. Die Javadoc
von
ForkJoinThread.managedBlock()
sagt
dazu: „
possibly
arranges for a spare thread to be activated if
necessary to ensure sufficient parallelism while the current thread is
blocked“ (siehe /
JDMB
/). Eine ausführliche Diskussion
des Problems durch Doug Lea findet sich unter /
DLMA
/.
Wie hoch ist die Gefahr, mit parallelen Streams
an Ressourcengrenzen zu stoßen? Bisher haben wir in den Beispielen Streams
verwendet, die nicht mehr als vier Elemente enthielten. Deshalb bestand
bei diesen Beispielen die Gefahr überhaupt nicht. Was ist aber, wenn
die Streams sehr groß werden? Auch dann gibt es bei den parallelen Streams
eine Beschränkung dadurch, dass die Anzahl der parallel auszuführenden
Tasks begrenzt ist. Die Zahl der Tasks richtet sich nach der Parallelität
des CommonPools, im Default-Fall also nach der Anzahl der Cores auf der
Plattform. Sie ist im Default-Fall die Zweier-Potenz, die größer oder
gleich 4*(Anzahl Cores – 1) ist. Das sind bei einem Dual-Core vier
parallele Tasks, auf einem Quad-Core 16 parallele Tasks, usw. . Man sieht
also, dass auf typischen Desktop-Systemen die maximale Anzahl paralleler
Tasks ziemlich übersichtlich bleibt, und auch auf den meisten aktuellen
Server-Systemen dürfte die Zahl immer noch im dreistelligen Bereich liegen.
Schauen wir uns ein Beispiel an, bei dem
die Beschränkung durch die maximale Anzahl der parallelen Tasks entsteht
und nicht wie bisher durch die Anzahl der Streamelemente. Wir können
dazu einfach das vorhergehende Beispiel nehmen und das Interval in der
range()
-Methode
so wählen, das der Stream sowohl für die blockierende als auch für die
nicht-blockierende Funktionalität acht (statt vier) Elemente groß ist:
Runnable task = () -> { try { Thread.sleep(1500); } catch (InterruptedException e) { e.printStackTrace();
}
IntStream. range(100, 10 8 ) .parallel() .forEach(i -> System.out.println(Thread.currentThread().getName() + ": nonblocking - " + i));
};
new Thread(task,"myThread").start();
IntStream. range(0,8) .parallel() .map(i -> { IntBlocker blocker = new IntBlocker(i); try { ForkJoinPool.managedBlock(blocker); } // Zeile 4 catch (InterruptedException e) { } return blocker.getValue(); })
.forEach(i -> System.out.println(Thread.currentThread().getName()
+ ": blocking - " + i));
Wenn wir den Code laufen lassen, sieht die Ausgabe nun so aus:
Abbildung 8: Ausgabe bei Ablauf zweier
konkurrierender paralleler Streams mit acht Elementen (blockierend mit
ManagedBlocker
)
Nach einer Sekunde werden die ersten vier der blockierenden Zahlen ausgegeben (2, 4, 0, 6). Es sind nur vier, weil der Stream auf unserem Dual-Core-System in vier parallel laufende Tasks mit je zwei Elemente aufgeteilt wurde. Die vier ausgegebenen Zahlen (2, 4, 0, 6) sind jeweils die ersten Elemente dieser vier Tasks. Nach 1,5 Sekunden werden alle nicht-blockierenden Zahlen ausgegeben (104, 105, 102,103, 100, 101, 106, 107). Da sie nicht-blockierend sind, sieht es in der Ausgabe so aus, dass sie alle auf einmal herauskommen. Ausgegeben werden sie von dem Thread, der die Stream-Operation angestoßen hat ( myThread ), und dem CommonPool-Thread worker-3 . Letzterer ist ein Kompensationsthread, da alle anderen CommonPool-Threads ( worker-0 bis 2 ) gerade per ManagedBlocker blockiert sind.
Nach zwei Sekunden, wenn der
sleep()
der blockierenden Funktionalität abgelaufen ist, werden die letzen vier
blockierenden Zahlen ausgegeben (3, 5, 1, 7).
Wie man sieht, hat der CommonPool nicht nur einen Worker-Thread, wie es defaultmäßig auf unserer Dual-Core-Plattform wäre, sondern wegen dem ManagedBlocker sind es vorübergehend vier Worker-Threads. Mehr Threads werden es aber in diesem Beispiel nicht, weil die Stream-Operation in nur vier parallele Tasks zerlegt wurde. Zusammenfassung
Die JDK-Designer haben die parallelen Streams
in Java 8 so konzipiert, dass sie für CPU-intensive Funktionalität gut
funktionieren. Trotzdem gibt es immer wieder den Wunsch von Java-8-Benutzern,
parallele Streams auch für blockierende Funktionalität zu verwenden.
Wir haben uns in diesem Artikel angesehen, welche Möglichkeiten es dafür
gibt.
Auf den JAX-Konferenzen 2014 (/ BGJ8 /) und 2015 hat Brian Goetz erwähnt, dass es in Zukunft eine bessere Unterstützung für blockierende Funktionalität bei den Streams geben soll. So wie es heute aussieht, wird es diese Unterstützung aber vermutlich noch nicht in Java 9 geben. Literaturverweise
Die gesamte Serie über Java 8:
|
|||||||||||||||||||||||||||||||||||||
© Copyright 1995-2018 by Angelika Langer. All Rights Reserved. URL: < http://www.AngelikaLanger.com/Articles/EffectiveJava/85.Java8.Streams-and-Blocking-Functionality/85.Java8.Streams-and-Blocking-Functionality.html> last update: 26 Oct 2018 |