Angelika Langer - Training & Consulting
HOME | COURSES | TALKS | ARTICLES | GENERICS | LAMBDAS | IOSTREAMS | ABOUT | CONTACT | Twitter | Lanyrd | Linkedin
 
HOME 

  OVERVIEW

  BY TOPIC
    JAVA
    C++

  BY COLUMN
    EFFECTIVE JAVA
    EFFECTIVE STDLIB

  BY MAGAZINE
    JAVA MAGAZIN
    JAVA SPEKTRUM
    JAVA WORLD
    JAVA SOLUTIONS
    JAVA PRO
    C++ REPORT
    CUJ
    OTHER
 

GENERICS 
LAMBDAS 
IOSTREAMS 
ABOUT 
CONTACT 
Effective Java

Effective Java
Java 8
Streams und Blockierende Funktionalität
 

Java Magazin, März 2016
Klaus Kreft & Angelika Langer

Dies ist die Überarbeitung eines Manuskripts für einen Artikel, der im Rahmen einer Kolumne mit dem Titel "Effective Java" im Java Magazin erschienen ist.  Die übrigen Artikel dieser Serie sind ebenfalls verfügbar ( click here ).

 
 
 

Diesmal wollen wir uns genauer ansehen, wie parallele Streams mit blockierender Funktionalität umgehen und welche Möglichkeiten man als Stream-Nutzer hat, sie dabei zu beeinflussen.

Die Ausgangssituation

In der Software-Architektur gibt es das grundsätzliche Prinzip, den Durchsatz bei blockierender Funktionalität durch die Verwendung von mehreren parallelen Threads zu erhöhen.  Typische Beispiele für blockierende Funktionalität sind synchrone Netzwerk-I/O über HTTP, synchrone Datenbankzugriffe über JDBC, usw. .  Verwendet man mehrere Threads, um diese blockierende Funktionalität parallel auszuführen, so lässt sich im Allgemeinen der Durchsatz steigern.  Um solche Lösungen mit mehreren parallelen Threads zu implementieren, ist einiges an Design- und Implementierungsaufwand erforderlich.  Außerdem braucht man ausreichendes Know-How im Bereich von Multi-Thread-Programming, z. B. zum Umgang mit Thread-Pools, zur Ergebnis-Synchronisation, usw. .

 
 

Jetzt, da parallele Streams ein Teil der JDKs sind, gibt es die Hoffung, dass sich solche Lösungen deutlich einfacher implementieren lassen: Man übergibt einfach die blockierende Funktionalität in einem Lambda-Ausdruck an eine geeignete Stream-Operation eines parallelen Streams. 
 
 

Leider sind parallele Streams nicht uneingeschränkt für eine solche Benutzung geeignet.  Wo die Probleme einer solchen Lösung sind und welche zusätzlichen Möglichkeiten man als Stream-Nutzer hat, diese Probleme zu umgehen bzw. zu minimieren, wollen wir uns im folgenden Artikel im Detail ansehen.

Ein Beispiel

Schauen wir uns zuerst ein konkretes Beispiel für blockierende Funktionalität an.  Wir wollen uns mit Hilfe eines HTTP-Requests Informationen (Symbol, Aktienkurs, sowie Veränderung des Kurses) zu dem Tickersymbol eines börsennotierten Unternehmens holen:

 
 

public static String getStockInfo(String s) {
    try {
        URL yahoofinance = new URL("http://finance.yahoo.com/d/quotes.csv?s="+s +"&f=sl1c");
        URLConnection yc = yahoofinance.openConnection();
        try (BufferedReader in = new BufferedReader(new InputStreamReader(yc.getInputStream()))) {
            String result = in.readLine();
            return result;
        }
    } catch (Exception e) { return null; }
}
 
 

Diese blockierende Funktionalität übergeben wir in einem Lambda-Ausdruck an die  map() Operation eines Streams.  Das heißt, in der  map() -Operation wird das Tickersymbol ( String ) auf einen Ergebnis- String abgebildet, der Symbol, Aktienkurs, sowie Veränderung des Kurses enthält.  Hier ist die gesamte Stream-Verarbeitung:
 
 

private static String[] stockSymbols = {"GOOG", "AAPL", "MSFT", "YHOO"};
    private static void test01() {
    Arrays. stream ( stockSymbols )//.parallel()
          .map(s -> getStockInfo (s))
          .peek(d -> System. out .println(Thread. currentThread ().getName()+":"+d))
          .map(s -> createStockData (s))
          .filter(s -> s != null)
          .reduce((s1,s2) -> s1.getChange() > s2.getChange()? s1 : s2)
          .ifPresent(s -> System. out .println(s));

}
 
 

Als Stream-Source verwenden wir ein Array mit vier Tickersymbolen.  Nach der bereits oben beschriebenen Abbildung des Tickersymbols auf einen  String mit den weiteren Aktieninformationen geben wir diesen String zusammen mit dem Namen des verantwortlichen Threads in einem  peek() auf  System.out aus.  Danach bilden wir den  String mit der Aktieninformation auf ein  StockData Objekt ab.  StockData ist eine Klasse mit drei Feldern:
 
 

private String corpSym; // Tickersymbol
private float price;    // Aktienkurs
private float change;   // Veränderung des Aktienkurs gegenüber dem Vortrag
 
 

sowie einem Konstruktor, der die drei Felder setzt und je einer Getter -Methode für jedes Feld.  Die Methode  createStockData() macht also nichts anderes, als den  String mit der Aktieninformation zu parsen und mit den dabei ermittelten Werten den  StockData -Konstruktor aufzurufen.  Den vollständigen Source-Code von  createStockData() und  StockData zeigen wir aus Platzgründen nicht in diesem Artikel.  Wer sich für den vollständigen Source-Code interessiert, findet ihn unter / SRC /.  Man kann den Source-Code auch herunterladen, um das Beispiel selbst am eigenen Rechner laufen zu lassen und mit den Effekten zu experimentieren.
 
 

Die anschließende  filter() -Operation wird benötigt, um  null -Werte aus dem Stream zu entfernen.  Sie können entstanden sein, weil sowohl  getStockInfo() als auch  createStockData() im Fehlerfall auf  null abbilden.  Als terminale Operation  wird am Ende in einem  reduce() die Aktie mit der größten Wertsteigerung ermittelt und ausgegeben.
 
 

Schauen wir uns nun an, wie die Ausgabe aussieht, wenn wir das Programm auf einem Dual-Core-Rechner ablaufen lassen.  Zuerst mit einem sequentiellen Stream:

Abbildung 1: Ausgabe bei Ablauf mit einem sequentiellen Stream
 
 

Jede Zeile, die im  peek() ausgegeben wird, erscheint mit einer gewissen Verzögerung in der Ausgabe.  Das liegt an der Latenzzeit des blockierenden HTTP-Requests.  Die letzte  peek() -Ausgabe erscheint zusammen mit dem Ergebnis in der Ausgabe.  Die Unterbrechungen in dem schwarzen Strich vorne vor der Ausgabe sollen genau dieses Anhalten der Ausgabe nach jeder der ersten drei Zeilen illustrieren.  In der Ausgabe der  peek() -Aufrufe können wir auch sehen, dass alle HTTP-Request-Aufrufe vom  main -Thread ausgeführt worden sind.
 
 

Und wie sieht das Ganze nun aus, wenn wir den Code mit einem parallelen Stream auf unserem Dual-Core-Rechner ausführen?  Dazu müssen wir im Code oben den  parallel() -Aufruf nach dem Erzeugen des Stream reinkommentieren.  Die Ausgabe ist dann:

Abbildung 2: Ausgabe bei Ablauf mit einem parallelen Stream
 
 

Jetzt werden immer zwei  peek() -Aufrufe zusammen ausgegeben und beim zweiten Paar wird auch die Ausgabe des Ergebnisses noch gleich mit angehängt.  Es werden je zwei  peek() -Aufrufe zusammen ausgegeben, weil jetzt zwei Threads ( main und  ForkJoinPool.commonPool-worker-1 ) parallel den Stream abarbeiten. 
 
 

Weil wir einen Dual-Core-Rechner verwenden haben, werden bei einem parallelen Stream default-mäßig zwei Threads verwendet.  Parallele Streams sind so implementiert, dass sie im Default-Fall auf einer Multi-Core-Plattform immer so viele Threads verwenden, wie sie Prozessor-Cores mit Hilfe von  java.lang.Runtime.availableProcessors() auf der Plattform finden.  Einer der Threads ist dabei der Thread, der die Stream-Operation angestoßen hat. Hier in unserem Beispiel ist es der  main -Thread.  Die anderen Threads werden vom  java.util.concurrent.ForkJoinPool.commonPool() zur Verfügung gestellt. Der CommonPool ist ein Singleton-Thread-Pool, der mit Java 8 in den JDK aufgenommen wurde. In unserem Fall wird vom CommonPool nur ein Thread ( ForkJoinPool.commonPool-worker-1 ) zur Verfügung gestellt, weil wir eine Dual-Core-Plattform verwenden.  Mehr zu parallelen Streams und dem CommonPool findet sich in einem unserer vorhergehenden Artikel (/ KLS1 /).
 
 

Wir haben jetzt also mit Hilfe des parallelen Streams die Software-Architektur implementiert, die wir oben bereits theoretisch beschrieben hatten. Der Durchsatz der HTTP-Requests hat sich dank des parallelen Streams im Vergleich zum sequentiellen Stream erhöht (etwa verdoppelt), und der Implementierungsaufwand dafür ist minimal (lediglich ein zusätzlicher Aufruf von  parallel() )  Wenn wir eine Plattform mit noch mehr Prozessor-Cores verwenden würden, könnten wir den Durchsatz sogar noch weiter steigern, weil dann default-mäßig noch mehr Threads bei Verwendung eines parallelen Streams zur Verfügung stehen würden. 

Das Problem

Leider hat diese Lösung ein Problem, das aber nicht ganz offensichtlich ist.  Was passiert, wenn wir in unserer JVM weitere parallele Streams nutzen, kurz nachdem wir die Methode  test01() mit einem parallelen Stream aufgerufen haben?  Die Funktionalität der nachfolgend gestarteten parallelen Streams kann den gemeinsam genutzten Thread des CommonPools nicht verwenden, denn dieser muss die blockierende Funktionalität der HTTP-Requests aus  test01() verarbeiten -  wobei das "Verarbeiten" im Wesentlichen darin besteht, dass der CommonPool-Thread in dem blockierenden HTTP-Request wartet.

 
 

Schauen wir uns dazu ein weiteres Beispiel an, das die Situation noch ein wenig kontrollierter auf den Punkt bringt:
 
 

private static void test02() {
 
 

   Runnable task = () -> {                       // Zeile 2

            try {

                Thread.sleep(1500);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }             
 
 

            IntStream.range(100, 104).parallel()

                    .forEach(i -> System.out.println(Thread.currentThread().getName() +

                                                               ": nonblocking - " + i));

        };
 
 

   new Thread(task,"myThread").start();           // Zeile 14
 
 

   IntStream.range(0,4).parallel()                // Zeile 16

                       .map(i -> {

                                    try {

                                       Thread.sleep(1000);

                                    } catch (InterruptedException e) {

                                       e.printStackTrace();

                                    }

                                    return i;

                                  })

                       .forEach(i -> System.out.println(Thread.currentThread().getName() +

                                                                  ": blocking - " + i));

}
 
 

Die wartende Funktionalität besteht diesmal in einem  sleep() von einer Sekunde, das beim Abbilden eines  int -Wertes (0 … 3) auf sich selbst auftritt.  Danach wird der  int -Wert ausgegeben.  Das Ganze ist ab Zeile 16 ff. implementiert.
 
 

Parallel dazu läuft der Thread  myThread . Er wartet 1,5 Sekunden, bevor er dann nicht-blockierend die  int -Werte von 100 bis 103 ausgibt (implementiert in Zeile 14 bzw. Zeile 2 ff.).  Zusätzlich werden bei allen Ausgaben die Thread-Namen mit ausgegeben.  Wie sieht nun die Ausgabe aus?

Abbildung 3: Ausgabe bei Ablauf zweier konkurrierender paralleler  Streams
 
 

Nach einer Sekunde werden die 2 und die 1 aus der blockierenden Funktionalität durch den  main -Thread und den einen Thread des CommonPools ausgegeben.  Nach 1,5 Sekunden werden 102, 103, 101, 100 von dem  myThread ausgegeben.  Der Thread des CommonPools kann sich nicht an der Ausgabe beteiligen, da er im  sleep() der blockierenden Funktionalität wartet.  Man sieht an der Tatsache, dass die Reihenfolge der  int -Werte nicht eingehalten wird, dass die Abarbeitung der nicht-blockierenden Funktionalität im Parallel-Modus erfolgt ist - aber mit nur einem Thread!  Nach zwei Sekunden werden die 3 und die 0 aus der blockierenden Funktionalität durch den  main -Thread und den einen Thread des CommonPools ausgegeben. 
 
 

Wir wollen das Ergebnis noch einmal kurz verallgemeinert zusammenfassen:  Blockierende Funktionalität in Streams bindet die gemeinsam genutzten CommonPool-Threads für ihre Aufgaben, die im Wesentlichen aus Warten bestehen.  Trotzdem werden nachfolgende parallele Stream-Aufrufe ausgeführt.  Es ist nicht so, dass nachfolgende Stream-Aufrufe abwarten müssen, bis die blockierende Funktionalität fertig geworden ist.  Allerdings werden nachfolgende parallele Stream-Aufrufe nicht parallel, sondern von nur von einem Thread ausgeführt, nämlich dem Thread, der die nachfolgende parallele Stream-Operation angestoßen hat.

Lösungsansätze

Welche Möglichkeiten hat man nun als Stream-Nutzer, in das Thread-Management der parallelen Streams einzugreifen? Grundsätzlich erst einmal relativ wenig. 

 
 

Das liegt daran, dass die Designmaxime bei der Entwicklung der Streams war: Streams und insbesondere parallele Streams sollen möglichst einfach zu benutzen sein.  Konfigurierbarkeit stand deshalb nicht im Vordergrund.  Es kommt noch hinzu, dass umfangreiche Konfigurierbarkeit leicht zu Fehlkonfigurationen führen kann.  Dies wollte man aber auf jeden Fall vermeiden, damit das neue Feature Streams kein schlechtes Image bekommt. Entsprechend sind parallele Streams vorwiegend für CPU-intensive Funktionalität und nicht für blockierende Funktionalität gedacht.  Wie gut parallele Streams zu CPU-intensiver Funktionalität passen, haben wir übrigens bereits in einem vorhergehenden Artikel über Stream-Performance diskutiert (/ KLS2 /).
 
 

Es gibt also nicht den einen Schalter, um parallele Streams an blockierende Funktionalität anzupassen.  Aber es gibt einige Möglichkeiten einzugreifen. Teilweise handelt es sich dabei richtiggehend um Hacks, die so nie vorgesehen waren.
 
 

Wir schauen uns die Möglichkeiten im Folgenden genauer an.  Dabei haben wir es so gemacht, dass wir mit den weniger aufwändigen Möglichkeiten beginnen.

Parallelität des CommonPool verändern

Wir haben schon oben erwähnt, dass im Default-Fall für die Bearbeitung eines parallelen Streams auf einer Multi-Core-Plattform immer so viele Threads verwenden, wie Prozessor-Cores auf der Plattform vorhanden sind.  Diese Threads sind:
  • der Thread, der die Stream-Operation anstößt und
  • Anzahl Cores – 1) viele Threads im CommonPool.

  •  
     

    Genau genommen ist die Anzahl der CommonPool-Threads nicht fix.  In Ruhephasen werden die Threads sukzessive beendet und unter Last werden sie wieder gestartet.  Auch gibt es Ausnahmesituationen, in denen der CommonPool mehr als (Anzahl Cores – 1) viele Threads enthalten kann.  Wir schauen uns diese spezielle Situationen unten im Absatz " ForkJoinPool.ManagedBlocker nutzen" an.
     
     

    Da die Anzahl der CommonPool-Threads nicht fix ist, spricht die Javadoc des CommonPools bzw.  ForkJoinPool s nicht von der "Anzahl der Threads des Pools" sondern von der "Parallelität des Pools" (Englisch: parallelism ).
     
     

    Der Default-Wert der Parallelität des CommonPools ist wie bereits gesagt auf einer Mulit-Core-Plattform: Anzahl Cores – 1. Die Parallelität lässt sich mit Hilfe der System Property  java.util.concurrent.ForkJoinPool.common.parallelism beim Start der JVM auf einen anderen Wert setzen.
     
     

    Was geschieht nun, wenn wir in unserem parallelen Tickersymbol-Beispiel ( test01() ) die Parallelität des CommonPools auf fünf erhöhen, also die JVM mit dieser Option starten:
     
     

    -Djava.util.concurrent.ForkJoinPool.common.parallelism=5
     
     

    Die entsprechende Ausgabe sieht dann so aus:

    Abbildung 4: Ausgabe bei Ablauf mit CommonPool-Parallelität=5
     
     

    Jetzt kommen alle Ausgaben zugleich heraus, da sechs Thread für den parallelen Stream zur Verfügung stehen.  In der Ausgabe können wir sehen, dass nur vier Threads benötigt wurden, weil unsere Streamsource nur vier Elemente enthält.  Das heißt, zwei Pool-Threads stehen trotz blockierender Funktionalität im CommonPool für weitere parallele Streams zur Verfügung.
     
     

    In unserem kleinen Beispiel war die Erhöhung der Parallelität des CommonPools erfolgreich.  Wir führen unsere blockierende Funktionalität mit Hilfe eines parallelen Streams aus, ohne dass wir alle Threads des CommonPools blockieren.  Das liegt natürlich daran, dass der Stream klein genug war.  Wäre das Array mit den Tickersymbolen deutlich größer, dann hätten wir alle Threads des CommonPools blockiert.  Dieser Lösungsansatz über die Erhöhung der Parallelität des CommonPools funktioniert also nur für relativ kleine parallele Streams.
     
     

    Die Erhöhung der Anzahl der CommonPool-Threads hat noch einen weiteren kleinen Nachteil.  Durch die höhere Anzahl der Threads kommt es zu mermehrtem Thread-Scheduling, was ein wenig die Performance verringert.  Das ist übrigens der Grund für die relativ geringe Default-Anzahl der Threads im CommonPool.

    Dedizierten  ForkJoinPool nutzen

    Bei Workshops und Kursen kommt bei diesem Thema meist die Frage:  Kann man nicht einfach einen anderen Thread-Pool anstelle des CommonPools für die blockierende Funktionalität nutzen?  Das wäre doch eine ganz gute Lösung.  Die CPU-intensive Funktionalität nutzt weiter gemeinsam den CommonPool und für die blockierende Funktionalität gibt es einen dedizierten neuen Pool.  Auf diese Weise kommen sich die beiden unterschiedlichen Typen von Funktionalität nicht in die Quere.  Von der Architektur ist es eine überzeugende und intuitive Lösung.

     
     

    Leider gibt es offiziell (Javadoc, Stream-Design, usw.) keine solche Möglichkeit.  Aber es gibt einen Hack, der auch auf einschlägigen Internetseiten (zum Beispiel hier: / SOCP /) beschrieben wird.  Schauen wir uns zuerst an, wie es geht und diskutieren danach diesen Ansatz.
     
     

    Die Idee ist, dass das Aufrufen der Stream-Funktionalität als ein  Runnable oder  Callable an einen eigenen  ForkJoinPool übergeben wird.  Mit anderen Worten, wir sorgen dafür, dass der Thread, der die parallele Stream-Operation anstößt, bereits ein Fork-Join-Pool-Thread ist.  Das führt dazu, dass die Threads dieses ForkJoinPools dann die Stream-Operationen ausführen und der CommonPool gar nicht verwendet wird.  Dieser Effekt ergibt sich wegen der internen, privaten Methode  ForkJoinTask.doInvoke() .  Sie prüft, ob sie von einem  ForkJoinWorkerThread ausgeführt wird. 
     

    • Falls nein, delegiert sie das weitere Ausführen an den CommonPool. Das geschieht bei normalen parallelen Stream-Aufrufen. 
    • Falls ja, delegiert sie das weitere Ausführen an den  ForkJoinPool , aus dem der  ForkJoinWorkerThread stammt.  Das ist das, was wir jetzt ausprobieren wollen.

     

    Die Implementierung für unser Tickersymbol-Beispiel ist relativ einfach:
     
     

    Runnable task = () -> Arrays. stream ( stockSymbols ).parallel()
            .map(s -> getStockInfo (s))
            .peek(d -> System. out .println(Thread. currentThread ().getName()+": "+d))
            .map(s -> createStockData (s))
            .filter(s -> s != null)
            .reduce((s1,s2) -> s1.getChange() > s2.getChange() ? s1 : s2)
            .ifPresent(s -> System. out .println(s));

    ForkJoinPool myPool = new ForkJoinPool(4);
    myPool.submit(task);
     
     

    Auf dem parallelen Stream führen wir die gleiche Funktionalität wie in der Methode  test01() aus.  Wir müssen diese Funktionalität jetzt aber als  Task formulieren, die an den  myPool mit  submit() übergeben wird.  Unseren  ForkJoinPool statten wir mit einer Parallelität von vier aus (siehe Konstruktorparameter).   Der CommonPool hat dagegen nur eine Parallelität von eins auf unserer Dual-Core-Umgebung. Weiter ist zu beachten, dass man sich anders als beim CommonPool um das Herunterfahren des  myPool kümmern muss.  Wir haben das Herunterfahren im obigen Beispielcode aber aus Gründen der Vereinfachung nicht gezeigt.
     
     

    Die Ausgabe ist diesmal:

    Abbildung 5: Ausgabe bei Ablauf mit dediziertem  ForkJoinPool
     
     

    Alle Zeilen werden wieder auf einen Schlag ausgegeben, weil wir unserer ForkJoinPool-Instanz eine Parallelität von vier gegeben haben.  Der Pool hat default-mäßig den Namen  ForkJoinPool-1 bekommen und wir sehen, dass seine vier Threads den Stream abarbeiten.
     
     

    Auch wenn diese Lösung mit unserem Beispiel funktioniert, hat dieser Ansatz das Problem, dass es ein Hack ist.  Das zeigt sich zum Beispiel daran, dass einige interne Dinge nicht passen.  So nutzt der parallele Stream (genauer gesagt die  AbstractTask ) immer die Parallelität des CommonPools, um die Anzahl der Zerlegungen der Fork-Join-Tasks zu berechnen. Dies geschieht auch, wenn der Stream auf einem ganz anderen  ForkJoinPool läuft, wie in dem Beispiel oben.  Die Zerlegung wird in unserem Beispiel mit Parallelism=1 (aus dem CommonPool) berechnet, aber hinterher mit Parallelism=4 (aus dem eigenen ForkJoinPool) ausgeführt.  Es passt nicht zusammen und macht wenig Sinn, führt aber anderseits auch nicht gleich zu katastrophalen Problemen.  Man sollte einfach darauf vorbereitet sein, dass mit dem gezeigten Hack nicht alles so läuft, wie normalerweise mit dem CommonPool.
     
     

    Es ist auch zu beachten, dass dieser Hack möglicherweise irgendwann nicht mehr funktionieren wird.  Brian Goetz, der Chefarchitekt für dieses Thema bei Oracle, hat auf der JAX-Konferenz 2014 in einer Fragestunde zu Thema Java 8 zu diesem Hack gesagt: „I don’t promise that it will work that way for the rest of time.“  In dem Video der Fragestunde (/ BGJ8 /) findet man weitere Hintergründe, sowie seine Einordnung des Hacks (Minute 30:50 bis 33:50 im Video).
     
     

    Ergänzend kann man zu dieser Lösung noch sagen, dass sie möglicherweise als eine Art Architekturstudie dienen kann, um danach die blockierende Funktionalität mit Hilfe einer eigenen  ForkJoinTask –Implementierung (also ohne Streams) selbst zu implementieren.  Im Gegensatz zur Stream-Lösung ist das dann natürlich deutlich aufwändiger und erfordert entsprechendes Fork-Join-Framework Know-How.  Dafür hat man dann aber eine stabile, zukunftssichere Lösung.  Wie so etwas grundsätzlich mit dem Fork-Join-Framework geht, haben wir in zwei früheren Artikeln bereits diskutiert (/ KLS3 /, / KLS4 /).

    ForkJoinPool.ManagedBlocker nutzen

    Bei dem letzten Lösungsansatz, den wir uns anschauen wollen, geht es wieder darum, die Anzahl der Threads im CommonPool zu erhöhen.  Diesmal wollen wir sie aber nicht permanent erhöhen.  Stattdessen wollen wir ein Feature des  ForkJoinPools nutzen, mit dem der Pool die mit wartender Funktionalität blockierten Threads vorübergehend - für die Dauer ihrer Wartezeit - durch neue Pool-Threads kompensiert.  Diese interessante Funktionalität des  ForkJoinPools wird selten erwähnt, ist aber gerade in unserer Situation sehr nützlich.  Da der CommonPool eine spezielle Instanz des  ForkJoinPool s ist, stellt auch er diese Funktionalität zur Verfügung.

     
     

    Der  ForkJoinPool hat ein nested Interface  ManagedBlocker . Die blockierende Funktionalität muss dieses Interface implementieren und dem Pool bekannt geben, so dass dieser das Kompensationsfeature für die entsprechende Funktionalität aktiviert.  Wie die Implementierung im Detail aussieht, schauen wir uns weiter unten an.  Zuerst wollen wir uns ansehen, wie die Ausgabe im Falle unseres Tickersymbol-Beispiels nun aussieht:

    Abbildung 6: Ausgabe bei Ablauf mit  ManagedBlocker
     
     

    Die Ausgabe sieht genauso aus wie zuvor in Abbildung 4.  Dort hatten wir selbst als Nutzer die Parallelität und damit die Anzahl der Threads des CommonPools erhöht.  Dieses Mal erhöht der CommonPool von sich aus die Anzahl der Threads, da er wegen der Verwendung des  ManagedBlockers wartende Threads durch neu gestartete Threads kompensiert.
     
     

    Schauen wir uns auch noch an, wie die Ausgabe für das Beispiel in  test02() aussieht, wenn man die blockierende Funktionalität mit Hilfe von  ManagedBlocker ausdrückt:

    Abbildung 7: Ausgabe bei Ablauf zweier konkurrierender paralleler  Streams (blockierend mit  ManagedBlocker )
     
     

    Nach einer Sekunde werden nicht nur zwei der vier blockierenden Werte (2, 1, 0, 3) ausgegeben, sondern alle vier auf einmal, weil der CommonPool jeden blockierenden Thread durch einen neuen Thread kompensiert hat.  Nach 1,5 Sekunden werden die nicht blockierenden Werte (102, 101, 103, 100) ausgegeben.  Daran ist neben dem  myThread , der die Ausführung des Streams angestoßen hat, ist nur ein Thread des CommonPools ( worker-1 ) beteiligt, weil die nicht blockierende Funktionalität (anders als die blockierende Funktionalität) vom CommonPool nicht kompensiert wird.
     
     

    Schauen wir uns nun an, wie wir die Implementierung der blockierenden Funktionalität aus dem Beispiel von  test02() , Zeile 16ff für die Benutzung geändert haben.  Zuerst muss dazu das blockierende  Thread.sleep(1000) in einen  ManagedBlocker gelegt werden:
     
     

      private static class IntBlocker implements ForkJoinPool.ManagedBlocker {

         private final int value;

         private volatile boolean done;
     
     

         public IntBlocker(int value) {

            this.value = value;

         }
     
     

         public boolean block() throws InterruptedException {

            if (done == false) {

               Thread.sleep(1000);

               done = true;

            }

            return true;

         }
     
     

         public boolean isReleasable() {

            return done;

         }
     
     

         public int getValue() {

            return value;

         }

      }
     
     

    Die Wrapper-Klasse  IntBlocker können wir nun in der  map () -Operation nutzen:
     
     

    IntStream.range(0,4).parallel()

             .map(i -> {

                    IntBlocker blocker = new IntBlocker(i);

                    try { ForkJoinPool.managedBlock(blocker); }  // Zeile 4

                    catch (InterruptedException e) { }

                    return blocker.getValue();

             })

             .forEach(i -> System.out.println(Thread.currentThread().getName() + ": blocking - " + i));
     
     

    Wichtig ist dabei, dass wir die statische Methode  managedBlock() des  ForkJoinPools aufrufen (Zeile 4); sie ist der Trigger für die Kompensationsfunktionalität des Pools.
     
     

    Von der Architektur her ist diese Lösung sicher der beste Ansatz: 

    Die Anzahl der Threads passt sich dynamisch abhängig von der Anzahl der blockierten Threads an.  Dies ist besser, als die Parallelität des CommonPools fest zu verändern. 

    Es wird weiter der CommonPool genutzt, auf den die Implementierung der parallelen Streams aufsetzt.
     
     

    Einen Nachteil dieser Lösung haben wir schon gesehen:  Die Implementierung ist im Vergleich zu den vorher diskutierten Lösungsansätzen relativ aufwändig.  Man muss die blockierende Funktionalität in einen  ManagedBlocker einpacken und diese Wrapper-Klassen dann entsprechend nutzen.
     
     

    Ein weiteres potentielles Problem dieses Ansatzes liegt in der Kompensation an sich.  Doug Lea hat das Problem so beschrieben: „make a choice between […] either locking up vs blowing up programs” (siehe / DLMA /).  Gemeint ist damit Folgendes: Um blockierende Threads ( locking up ) zu kompensieren, werden neue Threads gestartet.  Wenn diese aber auch wieder blockieren und kompensiert werden müssen, werden noch mehr neue Threads gestartet.  Dies kann soweit gehen, dass das Programm an seine Ressourcegrenzen stößt und abgebrochen wird ( blowing up ).  Ressourcegrenzen können dabei Dinge wie zum Beispiel die maximale Anzahl der Threads oder der maximale Speicher sein. Entsprechend ist es auch nicht so, dass jeder blockierte Thread immer sofort durch einen neuen Thread ersetzt wird.  Die Javadoc von  ForkJoinThread.managedBlock() sagt dazu: „ possibly arranges for a spare thread to be activated if necessary to ensure sufficient parallelism while the current thread is blocked“ (siehe / JDMB /).  Eine ausführliche Diskussion des Problems durch Doug Lea findet sich  unter / DLMA /.
     
     

    Wie hoch ist die Gefahr, mit parallelen Streams an Ressourcengrenzen zu stoßen?  Bisher haben wir in den Beispielen Streams verwendet, die nicht mehr als vier Elemente enthielten.  Deshalb bestand bei diesen Beispielen die Gefahr überhaupt nicht.  Was ist aber, wenn die Streams sehr groß werden?  Auch dann gibt es bei den parallelen Streams eine Beschränkung dadurch, dass die Anzahl der parallel auszuführenden Tasks begrenzt ist.  Die Zahl der Tasks richtet sich nach der Parallelität des CommonPools, im Default-Fall also nach der Anzahl der Cores auf der Plattform.  Sie ist im Default-Fall die Zweier-Potenz, die größer oder gleich 4*(Anzahl Cores – 1) ist.  Das sind bei einem Dual-Core vier parallele Tasks, auf einem Quad-Core 16 parallele Tasks, usw. .  Man sieht also, dass auf typischen Desktop-Systemen die maximale Anzahl paralleler Tasks ziemlich übersichtlich bleibt, und auch auf den meisten aktuellen Server-Systemen dürfte die Zahl immer noch im dreistelligen Bereich liegen.
     
     

    Schauen wir uns ein Beispiel an, bei dem die Beschränkung durch die maximale Anzahl der parallelen Tasks entsteht und nicht wie bisher durch die Anzahl der Streamelemente.  Wir können dazu einfach das vorhergehende Beispiel nehmen und das Interval in der  range() -Methode so wählen, das der Stream sowohl für die blockierende als auch für die nicht-blockierende Funktionalität acht (statt vier) Elemente groß ist: 
     
     

       Runnable task = () -> {                      

                try {

                    Thread.sleep(1500);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }             
     
     

                IntStream. range(100, 10 8 ) .parallel()

                        .forEach(i -> System.out.println(Thread.currentThread().getName() +

                                                                   ": nonblocking - " + i));

            };
     
     

       new Thread(task,"myThread").start();   
     
     

       IntStream. range(0,8) .parallel()

             .map(i -> {

                    IntBlocker blocker = new IntBlocker(i);

                    try { ForkJoinPool.managedBlock(blocker); }   // Zeile 4

                    catch (InterruptedException e) { }

                    return blocker.getValue();

             })

             .forEach(i -> System.out.println(Thread.currentThread().getName() + ": blocking - " + i));
     
     

    Wenn wir den Code laufen lassen, sieht die Ausgabe nun so aus:

    Abbildung 8: Ausgabe bei Ablauf zweier konkurrierender paralleler  Streams mit acht Elementen (blockierend mit  ManagedBlocker )
     
     

    Nach einer Sekunde werden die ersten vier der blockierenden Zahlen ausgegeben (2, 4, 0, 6).  Es sind nur vier, weil der Stream auf unserem Dual-Core-System in vier parallel laufende Tasks mit je zwei Elemente aufgeteilt wurde.  Die vier ausgegebenen Zahlen (2, 4, 0, 6) sind jeweils die ersten Elemente dieser vier Tasks. 

    Nach 1,5 Sekunden werden alle nicht-blockierenden Zahlen ausgegeben (104, 105, 102,103, 100, 101, 106, 107).  Da sie nicht-blockierend sind, sieht es in der Ausgabe so aus, dass sie alle auf einmal herauskommen.  Ausgegeben werden sie von dem Thread, der die Stream-Operation angestoßen hat ( myThread ), und dem CommonPool-Thread  worker-3 .  Letzterer ist ein Kompensationsthread, da alle anderen CommonPool-Threads ( worker-0 bis  2 ) gerade per  ManagedBlocker blockiert sind. 

    Nach zwei Sekunden, wenn der  sleep() der blockierenden Funktionalität abgelaufen ist, werden die letzen vier blockierenden Zahlen ausgegeben (3, 5, 1, 7). 
     
     

    Wie man sieht, hat der CommonPool nicht nur einen Worker-Thread, wie es defaultmäßig auf unserer Dual-Core-Plattform wäre, sondern wegen dem  ManagedBlocker sind es vorübergehend vier Worker-Threads.  Mehr Threads werden es aber in diesem Beispiel nicht, weil die Stream-Operation in nur vier parallele Tasks zerlegt wurde.

    Zusammenfassung

    Die JDK-Designer haben die parallelen Streams in Java 8 so konzipiert, dass sie für CPU-intensive Funktionalität gut funktionieren.  Trotzdem gibt es immer wieder den Wunsch von Java-8-Benutzern, parallele Streams auch für blockierende Funktionalität zu verwenden.  Wir haben uns in diesem Artikel angesehen, welche Möglichkeiten es dafür gibt. 

     
     

    Auf den JAX-Konferenzen 2014 (/ BGJ8 /) und 2015 hat Brian Goetz erwähnt, dass es in Zukunft eine bessere Unterstützung für blockierende Funktionalität bei den Streams geben soll.  So wie es heute aussieht, wird es diese Unterstützung aber vermutlich noch nicht in Java 9 geben. 

    Literaturverweise

    /SRC/   Source-Code zu diesem Beitrag
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/85.Java8.Streams-and-Blocking-Functionality/Test.java
    /BGJ8/
    Brian Goetz & Angelika Langer: Fragen und Antworten zu Java 8
    /DLMA/ 
    Doug Lea: Mixing Async and Blocking Components
    /JDMB/
    Javadoc der Methode   ForkJoinPool.managedBlock ()
    /KLS1/       Parallel Streams
    Klaus Kreft & Angelika Langer, Java Magazin, Juli 2015
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/81.Java8.Parallel-Streams/81.Java8.Parallel-Streams.html
    /KLS2/   Das Performance-Modell der Streams
    Klaus Kreft & Angelika Langer, Java Magazin, September 2015
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/82.Java8.Performance-Model-of-Streams/82.Java8.Performance-Model-of-Streams.html
    /KLS3/
    Klaus Kreft & Angelika Langer: Fork-Join-Framework, Teil 1: Internals
    /KLS4/
    Klaus Kreft & Angelika Langer: Fork-Join-Framework, Teil 2: Benutzung
    /SOCP/
    Stack Overflow: Custom Thread Pool in Java 8 Parallel Stream


    URL: http://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel-stream

    Die gesamte  Serie über Java 8:

    /JAV8-0/ Neue Features in Java 8 - Überblick
    Klaus Kreft & Angelika Langer, Java Magazin, März 2014
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/73.Java8.Overview/73.Java8.Overview.html
    /JAV8-1/ Funktionale Programmierung in Java
    Klaus Kreft & Angelika Langer, Java Magazin, September 2013
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/70.Java8.FunctionalProg/70.Java8.FunctionalProg.html
    /JAV8-2/ Lambda-Ausdrücke und Methoden-Referenzen
    Klaus Kreft & Angelika Langer, Java Magazin, Dezember 2013
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/71.Java8.Lambdas/71.Java8.Lambdas.html
    /JAV8-3/ Default-Methoden und statische Methoden in Interfaces
    Klaus Kreft & Angelika Langer, Java Magazin, Februar 2014
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/72.Java8.DefaultMethods/72.Java8.DefaultMethods.html
    /JAV8-4/ Übersicht über das Stream API in Java 8
    Klaus Kreft & Angelika Langer, Java Magazin, Mai 2014
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/74.Java8.Streams-Overview/74.Java8.Streams-Overview.html
    /JAV8-5/ Stream-Erzeugung und Stream-Operationen
    Klaus Kreft & Angelika Langer, Java Magazin, Juli 2014
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/75.Java8.Fundamental-Stream-Operations/75.Java8.Fundamental-Stream-Operations.html
    /JAV8-6/ Stream-Kollektoren und die Stream-Operation collect()
    Klaus Kreft & Angelika Langer, Java Magazin, September 2014
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/76.Java8.Stream-Collectors/76.Java8.Stream-Collectors.html
    /JAV8-7/ Stateful Lambdas - Regeln für die Seiteneffekte in Lambda-Ausdrücken, die an Stream-Operationen übergeben werden
    Klaus Kreft & Angelika Langer, Java Magazin, November 2014
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/77.Java8.Streams-and-Statefulness/77.Java8.Streams-and-Statefulness.html
    /JAV8-8/ Das Date/Time API
    Klaus Kreft & Angelika Langer, Java Magazin, Januar 2015
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/78.Java8.Date-Time-API/78.Java8.Date-Time-API.html
    /JAV8-9/ CompletableFuture
    Klaus Kreft & Angelika Langer, Java Magazin, März 2015
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/79.Java8.CompletableFuture/79.Java8.CompletableFuture.html
    /JAV8-10/ Optional<T>
    Klaus Kreft & Angelika Langer, Java Magazin, Mai 2015
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/80.Java8.Optional-Result/80.Java8.Optional-Result.html
    /JAV8-11/ Parallel Streams
    Klaus Kreft & Angelika Langer, Java Magazin, Juli 2015
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/81.Java8.Parallel-Streams/81.Java8.Parallel-Streams.html
    /JAV8-12/ Das Performance-Modell der Streams
    Klaus Kreft & Angelika Langer, Java Magazin, September 2015
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/82.Java8.Performance-Model-of-Streams/82.Java8.Performance-Model-of-Streams.html
    /JAV8-13/ reduce() vs. collect()
    Klaus Kreft & Angelika Langer, Java Magazin, November 2015
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/83.Java8.Reduce-vs-Collect-Stream-Operations/83.Java8.Reduce-vs-Collect-Stream-Operations.html
    /JAV8-14/ User-Defined Collectors
    Klaus Kreft & Angelika Langer, Java Magazin, Januar 2016
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/84.Java8.User-Defined-Stream-Collectors/84.Java8.User-Defined-Stream-Collectors.html
    /JAV8-15/ Parallele Streams und Blockierende Funktionalität
    Klaus Kreft & Angelika Langer, Java Magazin, März 2016
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/85.Java8.Streams-and-Blocking-Functionality/85.Java8.Streams-and-Blocking-Functionality.html
    /JAV8-16/ API-Design mit Lambdas
    Klaus Kreft & Angelika Langer, Java Magazin, Mai 2016
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/86.Java8.API-Design-With-Lambdas/86.Java8.API-Design-With-Lambdas.html
    /JAV8-17/ Low-Level-Aspekte beim API Design mit Lambdas
    Klaus Kreft & Angelika Langer, Java Magazin, Juli 2016
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/87.Java8.Programming-With-Lambdas/87.Java8.Programming-With-Lambdas.html
    /JAV8-18/ Benutzer-definierte Stream-Sourcen und Spliteratoren
    Klaus Kreft & Angelika Langer, Java Magazin, September 2016
    URL: http://www.angelikalanger.com/Articles/EffectiveJava/88.Java8.User-Defined-Stream-Sources-And-Spliterators/88.Java8.User-Defined-Stream-Sources-And-Spliterators.html

     
     

    If you are interested to hear more about this and related topics you might want to check out the following seminar:
    Seminar
    Lambdas & Streams - Java 8 Language Features and Stream API & Internals
    3 day seminar ( open enrollment and on-site)
    Java 8 - Lambdas & Stream, New Concurrency Utilities, Date/Time API
    4 day seminar ( open enrollment and on-site)
    Effective Java - Advanced Java Programming Idioms 
    4 day seminar ( open enrollment and on-site)
     
    Related Reading
    Lambda & Streams Tutorial & Reference
    In-Depth Coverage of all aspects of lambdas & streams
    Lambdas in Java 8
    Conference Presentation at JFokus 2012 (slides)
    Lambdas in Java 8
    Conference Presentation at JavaZone 2012 (video)
     

     
     
      © Copyright 1995-2018 by Angelika Langer.  All Rights Reserved.    URL: < http://www.AngelikaLanger.com/Articles/EffectiveJava/85.Java8.Streams-and-Blocking-Functionality/85.Java8.Streams-and-Blocking-Functionality.html  last update: 26 Oct 2018