Angelika Langer - Training & Consulting
HOME | COURSES | TALKS | ARTICLES | GENERICS | LAMBDAS | IOSTREAMS | ABOUT | NEWSLETTER | CONTACT | Twitter | Lanyrd | Linkedin
 
HOME 

  OVERVIEW

  BY TOPIC
    JAVA
    C++

  BY COLUMN
    EFFECTIVE JAVA
    EFFECTIVE STDLIB

  BY MAGAZINE
    JAVA MAGAZIN
    JAVA SPEKTRUM
    JAVA WORLD
    JAVA SOLUTIONS
    JAVA PRO
    C++ REPORT
    CUJ
    OTHER
 

GENERICS 
LAMBDAS 
IOSTREAMS 
ABOUT 
NEWSLETTER 
CONTACT 
Effective Java

Effective Java
Java 8
CompletableFuture
 

Java Magazin, März 2015
Klaus Kreft & Angelika Langer

Dies ist die Überarbeitung eines Manuskripts für einen Artikel, der im Rahmen einer Kolumne mit dem Titel "Effective Java" im Java Magazin erschienen ist.  Die übrigen Artikel dieser Serie sind ebenfalls verfügbar ( click here ).

 

Wir setzen unsere Reihe zum Thema Java 8 Neuerungen diesmal mit dem CompletableFuture fort.  Das CompletableFuture überträgt den Progammierstil des Fluent Programming , den wir uns bereits bei den Streams angesehen haben, auf das Behandeln asynchroner Ergebnisse.  Schauen wir uns aber zuerst an, welche Möglichkeiten es im JDK bisher für das Behandeln asynchroner Ergebnisse gab.
 

Ein kurzer Rückblick

Seit Java 5 gibt es im JDK das Interface  Future<T> , mit dessen überladener Methode  get() man auf asynchrone Ergebnisse warten kann.  Hier ist ein Beispiel: 

 
 

ExecutorService pool = ...

Future<String> future = pool.submit(()->getStockInfo("YHOO")); // Zeile 2
 
 

String stockInfo = "no result";

try { stockInfo = future.get(); }                              // Zeile 5

catch (InterruptedException | ExecutionException e) { ... }
 
 

System.out.println("YHOO: " + stockInfo);                      // Zeile 8
 
 

Hier implementieren wir in Zeile 2 mit Hilfe einer Lambda Expression das  Callable Interface und übergeben mit  submit() dieses  Callable an einen Threadpool, der es asynchron ausführen soll.  Inhaltlich besteht unsere Implementierung aus dem Aufruf der Methode  getStockInfo() mit dem Stringparameter  YHOO .  Dahinter steht ein HTTP-Request, der den aktuellen Kurs der Yahoo-Aktie ermittelt und als  String zurückgibt.  Wir erhalten vom Aufruf von  submit() ein  Future<String> zurück.  Dieses  future repräsentiert ein Handle auf das zukünftige  String -Ergebnis.

Die Pull-Lösung

Um aus unserem  future das Ergebnis zu bekommen, rufen wir in Zeile 4 auf dem  future die  get() -Methode auf.  Dieser Aufruf blockiert unseren Thread so lange, bis das Ergebnis asynchron ermittelt worden ist oder uns per  ExecutionException mitgeteilt wird, dass die Ausführung des  Callable s schief gegangen ist.

 
 

In Zeile 8 geben wir das Ergebnis aus, oder den Default-Text  “n o result , falls ein Fehler aufgetreten ist.
 
 

Dieser Pull-Ansatz funktioniert meist ganz zufriedenstellend.  Typischerweise stößt man aber an seine Grenzen, wenn man sehr viele Tasks an den Thread-Pool übergibt.  Man braucht dann nämlich für jede Task je einen Thread, um im blockierenden Aufruf von  get() auf das Ergebnis der jeweiligen Task zu warten. 
 
 

Das heißt, bei sehr vielen Tasks benötigen wir auch sehr viele Threads, die warten.  Das ist ziemlich ressourcenintensiv.  Zum einen kostet jeder Thread Speicherplatz.  Zum anderen ist die Anzahl von parallel nutzbareren Threads (plattformabhängig) beschränkt.  Das heißt, die Pull-Lösung skaliert nicht beliebig.

Die Poll-Lösung

Deshalb gibt es im  Future Interface auch noch eine weiter überladene Variante der  get() -Methode:
V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

 
 

Diese Methode hat einen Timeout-Parameter, der aus zwei Teilen besteht: einem  long -Wert und einem  TimeUnit -Wert, der angibt, in welcher Zeiteinheit  ( NANOSECONDS MICROSECONDS MILLISECONDS , usw.) der  long -Wert zu interpretieren ist.  Wird nach dem Aufruf von  get() der Timeout erreicht, wirft die Methode eine  TimeoutException und signalisiert so, dass das asynchrone Ergebnis noch nicht vorliegt.
 
 

Um nun auf die asynchronen Ergebnisse von sehr vielen Tasks zu warten, kann man in einer Schleife diese  get() -Methode  mit Timeout auf jedem korrespondierenden  Future aufrufen.  Dabei wird man den Timeout relativ kurz wählen (vielleicht sogar: 0  NANOSECONDS ), um die Latenzzeit zwischen Eintreffen des Ergebnisses und Weiterverarbeiten des Ergebnisses möglichst gering zu halten. 

Mit dem Poll-Ansatz benötigen wir nun nur noch einen Thread (die Schleife!) für alle Ergebnisse.  Was die Thread-Ressourcen angeht, ist dieser Ansatz also viel genügsamer.  Andererseits benötigt man nun zusätzliche CPU-Zeit für das Pollen.  Dabei richtet sich die Höhe des CPU-Verbrauchs im wesentlich danach, wie häufig wir beim  get() -Aufruf in den Timeout laufen und ohne Ergebnis zurückkommen.  Die Tuningparameter dafür sind die Timeout-Zeit und die Reihenfolge der  Future s in der Schleife.  Wie gut diese Lösung dann ist, hängt sehr von der jeweiligen Situation ab.

CompletionService: die Queue-Poll-Lösung

Die Abstraktion  CompletionService bietet eine Alternative zur oben beschriebenen direkten Nutzung des  Future -Interfaces.  Der  CompletionService legt die  Future s in einer Result-Queue ab und zwar in der Reihenfolge, in der die Resultate entstanden sind.  In der Result-Queue befinden sich also die „fertigen“  Future s, d.h. diejenigen, die bereits ein Ergebnis enthalten.  Aus der Queue holen kann man die  Future s und ihre Ergebnisse – ähnlich wie man es auch direkt beim  Future macht – mit einer  poll() -Methode, die sofort zurückkommt, oder einer  poll() -Methode mit Timeout.  Es gibt auch noch die Möglichkeit, mit  take() an der leeren Queue zu warten, bis das nächste  Future in der Queue eintrifft. 

 
 

Auch beim  CompletionService wird also ein Pull oder Poll gemacht, allerdings genügt wegen der Result-Queue ein einziger Thread, der alle Ergebnisse abholt und sie nacheinander verarbeitet.

CompletableFuture: die Push-Lösung

Mit Java 8 und dem  CompletableFuture gibt es nun einen weiteren Lösungsansatz. Er lässt sich sehr elegant implementieren und belastet weder Thread- noch CPU-Ressourcen.  Schauen wir uns zuerst an, wie das obige Beispiel (asynchrones Ermitteln des Aktienkurses über HTTP) damit aussieht:

 
 

ExecutorService pool = ...

CompletableFuture<String> future

        = CompletableFuture.supplyAsync(()->getStockInfo("YHOO"), pool);  // Zeile 3
 
 

future.thenAccept(info->System.out.println("YHOO: " + info));             // Zeile 5
 
 

In Zeile 3 erzeugen wir ein  CompletableFuture<String> ; es ist das Handle auf das zukünftige Ergebnis.  Dazu rufen wir die statische Methode  supplyAsync() und übergeben als Parameter die Lambda-Expression mit der auszuführenden Funktionalität und den Thread-Pool.  Hier sieht man schon einen Unterschied im API: während das  Future direkt vom  ExecutorService beim Übergeben der asynchron auszuführenden Funktionalität erzeugt wird (erstes Beispiel), gibt es für das  CompletableFuture statische Factory-Methoden, denen Task und  ExecutorService als Parameter übergeben wird.
 
 

Der entscheidende Punkt ist aber nun, wie das  CompleteFuture weiter verwendet wird.  Wir rufen auf dem  CompleteFuture die Methode thenAccept() auf und übergeben als Parameter eine Lambda-Expression, die auf das asynchrone Ergebnis angewandt wird, wenn es dann eingetroffen ist.  Der Thread, der den Aufruf der Methode thenAccept() macht, läuft nach dem Aufruf von  thenAccept() weiter, unabhängig davon, ob das asynchrone Ergebnis schon vorliegt oder nicht.
 
 

Das heißt, das Eintreffen des asynchronen Ergebnisses triggert die Ausführung der in Zeile 5 als Parameter übergebenen Lambda-Expression.  Daher stammt auch der Name: Push-Lösung. In unserem Fall gibt diese Lambda-Expression das Ergebnis auf STDOUT aus.  Auf den ersten Blick sieht diese Lösung fast aus wie ein Callback, insbesondere in unserem Beispiel: da wir  thenAccept() benutzt haben, führt der Poolthread, der das asynchrone Ergebnis ermittelt hat, anschließend synchron auch die in Zeile 5 übergebene Funktionalität aus.  Aber es steckt deutlich mehr dahinter.  Wenn wir z.B. in Zeile 5 statt  thenAccept() die Methode  thenAcceptAsync() verwenden, wird die als Parameter übergebene Lambda-Expression asynchron ausgeführt. 
 
 

Die Funktionalität des Push-Beispiel entspricht im Wesentlichen der des Pull-Beispiel weiter oben.  Es gibt einen kleinen Unterschied: im Fehlerfall wird beim Push-Beispiel gar nichts ausgegeben.  Die Details der Fehlerbehandlung beim  CompleteFuture schauen wir uns in diesem Beitrag weiter unten an.
 
 

Bleiben wir im Moment noch ein wenig beim konzeptionellen Modell der Push-Lösung.  Beide Methoden  thenAccept() und  thenAcceptAsync() verarbeiten das vorhergehende Ergebnis, ohne selbst wieder ein eigenes, neues Ergebnis zu liefern. Alternativ dazu gibt es die Methoden  thenA pply () und  thenA pply Async() .  Sie nehmen als Parameter Funktionalität vom Typ  Function<T,R> , die wieder ein Ergebnis erzeugt.  Damit kann man dann eine Kette von Verarbeitungsschritten (engl.: Stages ) erstellen, bei denen das Ergebnis der vorhergehenden Stufe jeweils die nachfolgende Stufe startet (push).  Die Verbindung zwischen den Schritten kann dabei synchron oder asynchron sein je nachdem, ob man die synchrone oder asynchrone Methode wählt. Implementieren lässt sich das im Stil des Fluent Programming s, den wir bereits von der Stream-Programmierung kennen (/ KRE /):
 
 

CompletableFuture.supplyAsync(()->getStockInfo("YHOO"), pool)

                 .thenApplyAsync( ... )

                 .thenApply( ... )

                 .thenAccept( ...); 

             

Die Details des API schauen wir uns weiter unten an.
 
 

Typ-Hierarchie

Superinterface  Future

Bisher hat es so geklungen, als würde das  CompletableFuture nur die Push-Lösung unterstützen.  Das ist aber nicht so, wie ein Blick auf die Typhierarchie von  CompletableFuture schnell klar macht:
Abbildung 1: Typhierarchie von  CompletableFuture

 
 

Die Klasse  CompletableFuture implementiert u.a. auch das Interface  Future .  Das heißt, das  CompletableFuture unterstützt neben der Push-Lösung auch die Pull- und Poll-Lösung.  Wir können also das Beispiel aus der Pull-Lösung von ganz oben auch mit einem  CompletableFuture statt mit einem  Future implementieren:
 
 

ExecutorService pool = ...

CompletableFuture<String> future

        = CompletableFuture.supplyAsync(()->getStockInfo("YHOO"), pool);
 
 

String stockInfo = "no result";

try { stockInfo = future.get(); }                             

catch (InterruptedException | ExecutionException e) { ... }
 
 

System.out.println("YHOO: " + stockInfo);              
 
 
 
 

Weiter bedeutet es, dass auch hybride Lösungen mit dem  CompletableFuture möglich sind.  Dies hier ist eine Push-Pull-Lösung:
 
 

try {

   result = CompletableFuture.supplyAsync(()->getStockInfo("YHOO"), pool)

                             .thenApplyAsync( ... )

                             .thenAppl( ... )

                             .get();           // Zeile 5

} catch (InterruptedException | ExecutionException e) { ... }
 
 

Die Verarbeitung erfolgt erst über einige Push-Stages, aber am Ende in Zeile 5 wartet der Thread dann auf das Gesamtergebnis (Pull-Lösung). 
 
 

Die Idee dahinter ist, dass das  CompletableFuture den maximalen Funktionsumfang einer  Future -Abstraktion im JDK unterstützt, und der JDK Benutzer entscheidet, was er davon wie nutzen möchte.

Superinterface CompletionStage             

Nach der Vorstellung des  CompletableFuture als Concurrency Utility für Java 8 (/ LEA1 /) wurde aber gerade dieser Ansatz kritisiert.  Es gab heftige Diskussionen darüber, dass das  CompletableFuture nicht nur das neue fluent Push-API, sondern weiterhin auch das alte  Future -API anbietet.  Es wurde beklagt, dass Bibliotheksentwickler (engl.: Library Writer ) nicht nur das Push-API anbieten können, wenn sie das  CompletableFuture hernehmen wollen, um eigene asynchrone Ergebnisse darüber zugänglich zu machen. Eine Bibliotheksschnittstelle könnte ein  CompletableFuture zurückliefert wie hier:

 
 

CompletableFuture<String> asyncRequest() {   // Ansatz 1

   CompletableFuture<String> future = new CompletableFuture<>();

   …

   return future;

}
 
 

Der Benutzer verwendet die Schnittstelle:
 
 

CompletableFuture<String> future = Library.asyncRequest( ... );
 
 

Dann wird dem Benutzer immer auch der Pull- und Poll-Zugang möglich und nicht allein das Push-API. 
 
 

Auf Grund dieses Feedbacks wurde dann die Funktionalität des Push-API in das Superinterface  CompletionStage herausgezogen, siehe (/ LEA2 /).  Das heißt, der zurück gelieferte Typ, der nur das Push-API zur Verfügung stellt, ist  CompletionStage .  Eine Bibliotheksschnittstelle kann und sollte also  CompletionStage zurückliefern, wenn sie nur das Push-API anbieten möchte:
 
 

CompletionStage<String> asyncRequest() {   // Ansatz 2

   CompletableFuture<String> future = new CompletableFuture<>();

   …

   return future;

}
 
 

Der Benutzer bekommt dann nur noch ein Objekt vom Typ  CompletionStage :
 
 

CompletionStage<StringY future = Library.asyncRequest( ... );
 
 

Intern in der Implementierung der Bibliothek wird aber weiterhin  CompletableFuture verwendet.  Es besteht so die Gefahr, dass ein Nutzer der Bibliothek das erhaltene Objekt auf  CompletableFuture zurück castet und Code wie diesen schreibt:
 
 

CompletableFuture<String> future

     = (CompletableFuture<String>)Library.asyncRequest( ... );  // Ansatz 3
 
 

Damit kann er dann über das  CompletableFuture auch die Pull- oder Poll-Funktionalität wieder verwenden.
 
 

Um mit dieser Situation richtig umgehen zu können, enthält das Interface  CompletionStage die Methode:
 
 

CompletableFuture<T> toCompletableFuture()
 
 

Sie stellt so etwas wie den offiziell vorgesehen Cast auf das  CompletableFuture zur Verfügung.  Der Bibliotheksentwickler kann nun entscheiden, ob er die Umwandlung in ein  CompletableFuture zulassen will oder nicht.  Wenn er sie zulassen will,  braucht er nichts weiter tun, denn die Default-Implementierung von  toCompletableFuture() gibt  this zurück.  Wenn er die Umwandlung abklemmen will, dann sieht der Code zum Beispiel so aus: 
 
 

CompletionStage<String> asyncRequest() {                       // Ansatz 4

   class ReactiveFuture<String> extends CompletableFuture<String> {

      public CompletableFuture<String> toCompletableFuture() {

        throw new  UnsupportedOperationException () ;

      }

      public String get() {

        throw new  UnsupportedOperationException () ;

      }

      ...  analog für die übrigen Future-Methoden ...

   }
 
 

   CompletableFuture<String> future = new ReactiveFuture<>();

   …

   return future;

}
 
 

Die Bibliotheksschnittstelle gibt nun einen lokalen Subtyp  ReactiveFuture von  Complet ableFuture zurück.  Die Ableitung überschreibt  toCompletableFuture() so, dass die Methode eine  UnsupportedOperationException wirft (siehe / TCF /).  Zusätzlich muss man auch alle aus dem Interface  Future geerbten Methoden, insbesondere die beiden  get() Methoden, so überschreiben, dass sie eine  UnsupportedOperationException werfen.  Denn der Brute-Force-Cast aus Ansatz 3 funktioniert ja weiterhin, aber durch das Überschreiben der  Future -Methoden kann der Benutzer nun nicht mehr auf die Pull- und Poll-Funktionalität zugreifen.

Das Benutzer API

Schauen wir uns nun das Benutzer-API des  CompletableFuture genauer an. 

Factory-Methoden

Wir haben in den vorhergehenden Beispielen bereits gesehen, dass es statische Factory-Methoden in der Klasse  CompletableFuture gibt, mit denen man ein  CompletableFuture erzeugen kann. Wir haben bisher nur eine einzige Variante von der überladenen Methode  supplyAsync() verwendet.  Dies ist nun die Liste aller Factory-Methoden:

 
 

static <U> CompletableFuture<U> supplyAsync(Supplier<U> task)

static <U> CompletableFuture<U> supplyAsync(Supplier<U> task, Executor executor)

static CompletableFuture<Void> runAsync(Runnable task)

static CompletableFuture<Void> runAsync(Runnable task, Executor executor)
 
 

Die als Parameter übergebene Task kann entweder vom Typ  Supplier <U> sein; dann erzeugt sie ein Ergebnis vom generischen Typ  U .  Die als Parameter übergebene Task kann  alternativ ein  Runnable sein, das kein Ergebnis hat.  Abhängig davon erzeugt die Factory-Methode dann ein  CompletableFuture<U> (Ergebnis vom Typ  U ) bzw. ein  CompletableFuture<Void> (kein Ergebnis). 
 
 

Auffallend ist vielleicht, dass die ergebniserzeugende Task vom Interface-Typ  java.util.function.Supplier< U > ist und nicht  java.util.concurrent.Callable<U> wie bisher in ähnlichen Situationen.  Andere Abstraktionen im Package  java.util.concurrent verwenden üblicherweise das  Callable (zum Beispiel  ExecutorService .submit() ).  Der Grund dafür ist einfach, dass  Callable s Checked-Exceptions werfen dürfen (was der  Supplier nicht darf) und der Fluent-Programming-Stil unter Verwendung von Lambdas sich nicht gut mit Checked-Exceptions verträgt.  Details dazu finden sich hier: / LEA3 /.
 
 

Der zweite optionale Parameter der Factory-Methoden ist der Thread-Pool vom Typ  Executor .  Das ist der Pool, von dem die Task ausgeführt werden soll.  Wenn man keinen Thread-Pool als Parameter mitgibt,

wird der neue „default“ Thread-Pool des JDK 8, nämlich  java.util.concurrent.ForkJoinPool.commonPool() , verwendet, falls dieser Pool eine Parallelität (englisch : parallelism ) >= 2 hat, oder sonst

wird explizit ein neuer Thread für die Ausführung der Task gestartet.

             

Details zum Common Pool finden sich in der Javadoc des  ForkJoinPool s (/ FJP /).  Die Einstellung der Parallelität ist in der Javadoc nicht beschrieben.  Dazu muss man sich den Sourcecode ansehen und zwar die  private Methode  ForkJoinPool.makeCommonPool() .  Da sieht man dann, dass die Parallelität im Default-Fall bei einem Multi-Core-System die Anzahl der Cores – 1 ist (genauer:  Runtime.get Runtime().availableProcessors() –1 ).  Bei einem Single-Core-System ist die Parallelität 1. Daneben lässt sich die Parallelität explizit mit Hilfe der System-Property  java.util.concurrent.ForkJoinPool.common.parallelism festlegen (siehe / FJP /). 

Von  CompletionStage geerbte Methoden

Die von  CompletionStage geerbtem Methoden bilden mit knapp vierzig Methoden den umfangreichsten Teil des  CompletableFuture API.  Wie wir bereits oben unter Push-Lösung diskutiert haben, erlauben diese Methoden, Programme im Fluent-Programming-Stil in synchrone bzw. asynchrone Verarbeitungsschritte aufzuteilen. 
Wir wollen uns im Folgenden nicht jede Methode im Detail ansehen, denn dies würde den Rahmen des Artikels sprengen.  Uns geht es vielmehr darum zu vermitteln, wie dieser Teil des APIs strukturiert ist.  Dieses Wissen erlaubt es dann, mit Hilfe der Javadoc die gewünschte Funktionalität zu finden.

 
 

Die elementare Basisfunktionalität wird von den drei Methoden
 
 

<U> CompletableFuture<U>    thenApply  (Function<T,U> task)

    CompletableFuture<Void> thenRun    (Runnable      task)

    CompletableFuture<Void> thenAccept (Consumer<T>   task)
 
 

zur Verfügung gestellt.  Ähnlich wie bei den Factory-Methoden, die wir uns vorher angesehen haben, gilt auch hier: der Methodenname bestimmt den Typ der Task, die als Parameter übergeben wird und ausgeführt werden soll:

thenApply() nimmt als Parameter eine  Function<T,U> , die das Ergebnis der vorhergehenden Stufe als Input (Typ  T ) bekommt und selbst wieder ein Ergebnis (vom Typ  U ) produziert und zurückgibt.  Der Returntyp von  thenApply() ist daher  CompletableFuture<U> .

thenRun() nimmt als Parameter ein  Runnable , das weder einen Input hat noch ein Ergebnis produziert.  Der Returntyp von  thenRun() ist daher  CompletableFuture<Void> .

thenAccept() nimmt als Parameter einen  Consumer<T> , der das Ergebnis der vorhergehenden Stufe als Input (Typ  T ) bekommt, selbst aber kein Ergebnis produziert.  Der Returntyp von  thenAccept () ist daher  wieder  CompletableFuture<Void> .
 
 

Von jeder dieser drei Methoden gibt es je drei Varianten.  Sehen wir uns diese am Beispiel von  thenRun() an:

CompletableFuture<Void> thenRun      (Runnable  task )

CompletableFuture<Void> thenRunAsync (Runnable  task )

CompletableFuture<Void> thenRunAsync (Runnable  task , Executor executor)

Die Varianten unterscheiden sich dabei dadurch, wie sie die übergebene Task ausführen.   thenRun() führt die Task synchron mit dem Thread aus, der die vorhergehende Stufe ausgeführt hat.  Für thenRunAsync() gilt das gleiche, wie oben bei den asynchronen Factory-Methoden beschrieben.  Die Variante mit dem expliziten  Ececutor führt die Task asynchron auf diesem Thread-Pool aus.  Die Variante ohne Executor führt die Task asynchron auf dem CommonPool aus. Falls dessen Parallelität kleiner als zwei ist, wird explizit ein Thread gestartet, in dem die Task asynchron ausgeführt wird.
 
 

So ergeben sich als Basisfunktionalität neun Methoden, mit denen man Verarbeitungsstufen hintereinander verketten kann.
 
 

Dazu kommt dann noch:
 
 

<U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> task)
 
 

Diese Methode ist so etwas wie das flat-map für das  CompletableFuture .  Wie man an der Signatur sieht, ist das Ergebnis der Task bereits eine  CompletionStage<U> , welche dann als  CompletableFuture<U> von  the n Compose() zurück gegeben wird.  Würde man diese Task mit  thenApply() statt mit  thenCompose() ausführen, würde man ein  CompletableFuture<CompletableFuture<U>> erhalten.  Das ist nicht das, was man im Allgemeinen als Ergebnis haben will, wenn man fluent die nächste Stufe anschließen möchte.  Deshalb benutzt man in einem solchen Fall  thenCompose() statt  thenApply() .  Ergänzend zum synchronen  thenCompose() gibt es auch wieder zwei asynchrone Versionen von  thenComposeAsynch() .
 
 

In dem von  CompletionStage geerbten API gibt es desweiteren Methoden, die es erlauben, eine Verarbeitungsstufe nicht nur an eine, sondern an zwei vorhergehende Verarbeitungsstufen anzuschließen. 
 
 

Dabei kann der Anschluss so erfolgen, dass die nachfolgende Stufe ausgeführt wird, nachdem nur eine der vorhergehenden Stufen ausgeführt wurde.  Zum Beispiel:
 
 

<U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other,

                                       Function<? super T,U> task)
 
 

Die vorhergehenden Stufen sind dabei zum einen das  this auf dem die Methode aufgerufen wird und zum anderen der erste Parameter der Methode.  Abhängig davon welche der beiden Stufen zuerst ausgeführt worden ist, geht das erste Ergebnis als Input an die Task.
 
 

Der Anschluss kann aber auch so erfolgen, dass die nachfolgende Stufe ausgeführt wird, nachdem nicht nur eine, sondern beide vorhergehenden Stufen ausgeführt worden sind.  Zum Beispiel:
 
 

<U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, 
                                        BiFunction<? super T,? super U,? extends V> task)

Wie man an der Signatur sehen kann, ist in diesem Fall die Task eine  B iFunc tion , die die Ergebnisse beider vorgehender Verarbeitungsstufen als Input erhält.
 
 

Natürlich gibt zu  applyToEither() und  thenCombine() auch wieder die beiden asynchronen Varianten. 
 
 

Zusätzlich gibt für beide Anschlussarten  applyToEither (Function) und  thenCombine (BiFunction) abhängig von der Art der Task, die ausgeführt werden soll ( Function Runnable oder  Consumer ), weitere Methoden  acceptEither( Consumer ) runAfterEither(Runnable) und  thenAcceptBoth(BiConsumer) runAfterBoth(Runnable) . Auch diese Methoden gibt es natürlich wieder in drei Varianten.

Das Behandeln von Fehlern
Bisher haben wir so getan, als würde in einer Verarbeitungspipeline kein Fehler auftreten und jede Stufe ohne Probleme ihren Nachfolger starten.  Was ist aber, wenn Fehler in einer Stufe auftreten und dabei unchecked Exceptions oder Errors geworfen werden? (Wir haben bereits oben diskutiert, dass keiner der Tasktypen checked Exceptions zulässt [1] .)  Grundsätzlich hat man zwei Möglichkeiten: Man kann selbst lokal in der Task auf den Fehler reagieren oder den Error Handling Support des  CompletableFutures nutzen.

 
 

Mit „lokal selbst in der Task reagieren“ ist gemeint, dass man die Exception schon in der Task selbst wieder fängt und die Task normal beendet wird.  Wenn nötig kann die Task zusätzlich auch ein sinnvolles Default-Ergebnis liefern.  Dies kann zum Beispiel  null sein.  Die Folgestufe muss dann natürlich auch in der Lage sein, mit diesem Default-Wert als Input umgehen zu können.  Dieser Ansatz bedeutet, dass man einigen Aufwand in die lokale Fehlerbehandlung stecken muss und die einzelnen Verarbeitungsstufen genau auf einander abstimmt sein müssen.
 
 

Schauen wir uns deshalb nun auch noch an, was das  CompletableFuture beim Error Handling Support bietet; vielleicht kann man sich ja damit Arbeit sparen.
 
 

Was ist, wenn die Task einer Verarbeitungsstufe mit einer unchecked Exception abgebrochen wird?  Dann werden alle folgenden Stufen nicht mehr ausgeführt und die Exception geht verloren, wenn man keine weiteren Vorkehrungen trifft.  Das heißt, die Pipeline wird mit der Stufe, in der die Exception aufgetreten ist, abgebrochen.  Der Abbruch wird aber nicht nach Außen signalisiert.  Dies gilt, wenn die Verarbeitungsstufen mit einer der Methoden, die wir oben diskutiert haben, implementiert worden ist.  Es gibt aber für die Fehlererkennung und -behandlung sowie das mögliche Wiederaufsetzen der Pipeline spezielle Methoden, die wir uns nun genauer ansehen wollen:
 
 

    CompletableFuture<T> exceptionally(Function<Throwable,T>     task)

    CompletableFuture<T> whenComplete (BiConsumer<T,Throwable>   task)

<U> CompletableFuture<U> handle       (BiFunction<T,Throwable,U> task) 
 
 

Die Methode  exceptionally() erlaubt es, einen neues  CompletetableFuture<T > mit einem neuen Wert zu erzeugen, wenn die vorhergehende Stufe mit einer Exception abgebrochen wurde.  Schauen wir uns dazu ein Test-Beispiel an:
 
 

boolean flag = ...  // Zeile 1

CompletableFuture.supplyAsync(() -> {

                                       if (flag)

                                          throw new IllegalStateException("TEST");

                                       return "hello world";

                                    })

                 .exceptionally(e -> {

                                        System.err.println("exception occurred: " + e);

                                        return "new text";

                                     })

                 .thenAccept(s -> System.out.println(s));
 
 

Wenn das  flag in Zeile 1 mit  false initialisiert wird, ist der Returnwert der ersten Task " hello world" .  Da keine Exception geworfen wurde, wird das  exceptionally() übersprungen und gleich das  thenAccept () mit dem " hello world" als Input ausgeführt.  Der Output des Programms ist also:  hello world .

Wenn das  flag in Zeile 1 mit  true initialisiert wird, wirft die erste Task eine  IlegalStateException .  Die Task wird abgebrochen und liefert keinen Returnwert.  Da die vorhergehende Stufe mit einer Exception abgebrochen, wurde wird nun das  exceptionally() ausgeführt.  Wir geben die Exception, die wir als Parameter erhalten, auf STDERR aus und geben einen Ersatzttext ( "new text" ) zurück. Dieser wird nun als Input an das  thenAccept() übergeben und dort ausgegeben. Der Output des Programms ist nun:

exception occurred: java.util.concurrent.CompletionException: java.lang.IllegalStateException: TEST

new text

Man sieht an der Ausgabe, dass die eigentliche Exception ( IllegalStateException ) in eine  CompletionException eingepackt ist.
 
 

Wenn man diese Fehlerbehandlung mit der Lösung vergleicht, die wir oben als „lokal selbst in der Task reagieren“ beschrieben haben, so sind beide gar nicht so unterschiedlich.  In beiden Fällen wird im Fehlerfall ein neuer Wert erzeugt, der der nächsten Stufe als Input dient.  Die Vorteile der Lösung mit  exceptionally() sind:

der Code der Task muss nicht verändert werden (keine zusätzlichen try-catch-Blöcke),

die explizite Fehlerbehandlungsstufe auf oberster Ebene im Code ist deutlich sichtbar und unabhängig von der vorhergehenden Task implementiert.
 
 

Mit der Methode  handle() lässt sich die Fehlerbehandlung in die Folgestufe ziehen.  Das heißt die Funktionalität von  exceptionally() und  thenAccept() aus dem vorhergehenden Beispiel kann alternativ in  handle() implementiert werden.  Der Code sieht dann so aus:
 
 

boolean flag = ...  // Zeile 1

CompletableFuture.supplyAsync(() -> {

                                       if (flag)

                                          throw new IllegalStateException("TEST");

                                       return "hello world";

                                     })

                 .handle((s, e) -> {

                                      if (e != null) {

                                         System.err.println("exception occurred: " + e);

                                         s = "new text";

                                      }

                                      System.out.println(s);

                                      return (Void)null;

                                   });
 
 

Die  BiFunktion von  handle () hat zwei Input-Parameter.  Der erste ist das potentielle Ergebnis, der zweite die potentielle Exception der vorhergehenden Stufe.  Einer davon ist immer  null , je nachdem ob die vorhergehende Stufe normal beendet wurde (Exception  null ) oder mit einer Exception abgebrochen wurde (Ergebnis  null ).  Wenn man das weiß, ist die Implementierung leicht zu verstehen.  Sie liefert das gleiche Ergebnis wie das vorhergehende Beispiel.  Der Vollständigkeit halber sollte noch erwähnt werden, dass es neben  handle() zwei weitere asynchrone Varianten  handleAsync() gibt.
 
 

Kommen wir nun noch zu
 
 

CompletableFuture<T> whenComplete(BiConsumer<T,Throwable> task)
 
 

Die Methode dient dazu, den Ausgang der vorhergehenden Stufe mitzuloggen.  Die beiden Input-Parameter des  BiConsumer sind wieder das potentielle Ergebnis und die potentielle Exception der vorhergehenden Stufe.  Typischerweise würde man sie in der Implementierung des  BiConsumer s nutzen, um sie zu loggen.   Man kann keinen neuen Wert setzten.  Das heißt, auf den Returnwert von  whenComplete() haben wir gar keinen Einfluss; es wird einfach der Returnwert der vorhergehenden Stufe verwendet.  Zu  whenComplete() gibt es auch wieder zwei Varianten von  whenCompleteAsync() .
 
 

Bevor wir uns ein Beispiel für die Nutzung von  whenComplete() ansehen, wollen wir unser Augenmerk noch mal auf einen Aspekt richten, den wir bereits kurz erwähnt haben.  Die „normalen“ Stufen-Methoden wie  thenApply() thenRun () thenAccept() und ihre asynchronen Varianten werden nicht aufgerufen, wenn in einer vorhergehenden Stufe eine Exception geworfen wurde.  Nur die Methoden für die Fehlerbehandlung  exceptionally() whenComplete() handle() und ihre asynchronen Varienten (bei  exceptionally() gibt es keine) werden in einer solchen Situation noch aufgerufen.  Das bedeutet, dass die Input-Parameter für die Fehlerbehandlungsmethoden nicht aus der im Code vorhergehenden Stufe stammen, sondern aus der vorhergehenden Stufe, die ausgeführt worden ist.  Schauen wir uns mit diesem Aspekt im Hinterkopf ein Beispiel für die Benutzung von  whenComplete() an:
 
 

boolean flag = ...  // Zeile 1

CompletableFuture.supplyAsync(() -> {

                                      if (flag)

                                        throw new IllegalStateException("TEST");

                                      return "hello world";

                                    })

                 .thenAccept(s -> System.out.println(s))

                 .whenComplete((v,e) -> {

                                          if (e != null)

                                            System.err.println("exception occurred: " + e);

                                        });
 
 

Wenn das  flag in Zeile 1 mit  false initialisiert wird, gibt das  thenAccept() hello world aus.  Das  whenComplete() wird zwar aufgerufen, tut aber nichts.  Angemerkt werden sollte vielleicht noch, dass beide Input-Parameter von  whenComplete() den Wert  null haben. Das  e ist  null , weil vorher keine Exception geworfen wurde, und das  v ist  null , weil der Returnwert der Methode  thenAccept() vom Typ  CompletableFuture<Void> ist, dass heißt, die Task hat kein Ergebnis produziert.
 
 

Interessant wird das Ganze, wenn wir das  flag in Zeile 1 mit  true initialisieren.  Die  supplyAsync() -Stufe wird mit einer  IllegalStateException abgebrochen.  Die  thenAccept() -Stufe wird gar nicht aufgerufen, weil die vorhergehende Stufe nicht normal beendet wurde.  Die  whenComplete() -Stufe wird aber wieder aufgerufen, weil sie ja eine Fehlerbehandlungsstufe ist.  Als Exception-Parameter erhält sie die Exception aus dem  supplyAsync()  und gibt sie aus:
 
 

exception occurred: java.util.concurrent.CompletionException: java.lang.IllegalStateException: TEST
 
 

Mit dem  whenComplete() am Ende der Pipeline behandeln (= loggen) wir alle Exceptions, die in irgendeiner Stufe in der Pipeline auftreten.  Dies ist unter Umständen ein sinnvoller Fehlerbehandlungsansatz: ein globales Errorhandling für die ganze Pipeline, statt jede Stufe einzeln auf Fehler zu überwachen.  Ein Kriterium für die Entscheidung, ob die Fehlerbehandlung am Ende der Pipeline ausreicht, ist ganz sicher die Frage, ob die Verarbeitungsstufen Seiteneffekte haben.  Wenn ja, wird man die Fehlerbehandlung vermutlich sehr nah an das Auftreten des Fehler heranbringen wollen, um sich klar darüber zu werden, welche Seiteneffekte bereits getätigt worden sind, und wie mit diesen umzugehen ist (Rollback, …).  Wenn nein, dann ist die Fehlerbehandlung am Ende der Pipeline häufig eine sinnvolle, weil wenig aufwändige Lösung.

Von Future geerbte Methoden

Die von  Future geerbten Methoden funktionieren im Prinzip wie bisher.  Eine Ausnahme bildet dabei aber  cancel() .  Da das  CompletableFuture keine Kontrolle über den Thread hat, der die abzubrechende Task ausführt, funktioniert  cancel() nur eingeschränkt und manchmal mit überraschendem Verhalten.  Schauen wir uns dazu ein Beispiel an:

 
 

Runnable runner = () -> {

                          System.out.println("started ...");

                          try {Thread.sleep(500); }

                          catch (InterruptedException ie) { return; }

                          System.out.println("... finished");

                        };
 
 

                CompletableFuture<Void> future = CompletableFuture.runAsync(runner);
 
 

future.whenComplete((v, e) -> {

                                if (e != null)

                                   System.out.println("exception occurred: " + e);

                              });
 
 

try {Thread.sleep(10); }            // Zeile 15

catch (InterruptedException ie) {}  // Zeile 16
 
 

future.cancel(true);
 
 

Wir starten mit  runAsync() ein  Runnable , das seinen Start ausgibt, 500ms schläft und dann noch sein Ende ausgibt.  Sollte während des Schlafens eine  InterrupedException erfolgen, beendet sich die Task sofort.  Hier müssen wir jetzt den fluent Programmierstil unterbrechen, um das  CompletableFuture zur gestarteten Task in der lokalen Variablen  future zu speichern.  Auf  future wenden wir die Fehlerbehandlung aus dem vorhergehenden Beispiel an.  Dann schlafen wir 10ms.  Als letztes brechen wir die Ausführung der Task ab, indem wir auf  future die Methode  cancel() aufrufen.  Die Ausgabe des Beispiels ist:

started ...

exception occurred: java.util.concurrent.CancellationException

... finished

Wie beschrieben kann das  cancel() die bereits in einem Thread gestartete Task nicht abbrechen, deshalb läuft sie weiter bis zu ihrem Ende.  Auffallend ist dabei vielleicht, dass die Fehlerbehandlungsstufe ( whenComplete() ) bereits mit dem  cancel() getriggert wird und nicht erst, wenn die vorhergehende Task zu Ende gelaufen ist (siehe Reihenfolge der Ausgabe!).  Wie man sieht, bekommt man als Fehler eine  CancellationException im Gegensatz zur  CompletionException ; die kommt, wenn die vorhergehende Task von einer eigenen Exception abgebrochen wurde.
 
 

Lässt man den  sleep() in Zeile 15+16 weg, bekommt man eine Race Condition .  Je nachdem, ob die Task schneller in einem Thread gestartet wird, oder das  cancel() schnell ist,  bekommt man verschiedene Ergebnisse.  Ist der Threadstart schnell, so ist das Ergebnis das gleiche wie mit  sleep() .  Ist das  cancel() schneller, sieht die Ausgabe so aus:
 
 

exception occurred: java.util.concurrent.CancellationException
 
 

Das heißt, das  cancel() hat dazu geführt, dass die Task erst gar nicht ausgeführt wurde.  In einer realen Situation ist so eine Race Condition natürlich unbefriedigend, besonders wenn die Task Seiteneffekte enthält.  Ob das  cancel() unter diesen Bedingungen überhaupt eine sinnvolle Funktionalität darstellt, muss wohl jeder für sich selbst an seinem eigenen Problem ausmachen.

Weitere Methoden

Der Vollständigkeit halber wollen wir noch darauf hinweisen, dass es neben den Methoden aus  Compl et ionStage und  Future noch weiter Methoden im Benutzer API gibt, zum Beispiel:  join() getNow() .  Wir wollen im Rahmen des Artikels nicht auf sie eingehen.  Wenn man das Benutzer API des  CompletableFuture soweit verstanden hat, wie wir es hier beschrieben haben, dann sollte es nicht schwer fallen, bei Bedarf die restlichen Methoden an Hand der Javadoc zu verstehen.

Das Bibliotheksentwickler (Library Writer) API

Wir haben uns bisher hauptsächlich das Benutzer-API des  CompletableFuture s angesehen.  Daneben gibt es noch ein Bibliotheksentwickler-API. Diese Schnittstelle kann man nutzen, um asynchrone Ergebnisse aus einer eigenen Bibliothek als  CompletableFuture oder  CompletionStage zur Verfügung zu stellen.  Zu diesem API gehört zum Beispiel der Konstruktor und Methoden wie  complete() completeExceptionally() .

 
 

Die Nutzung ist erst einmal ziemlich geradeaus.  Mit dem Konstruktor erzeugt man sich ein  CompletableFuture , das man dem Benutzer zurückgibt.  Wenn zu einem späteren Zeitpunkt das asynchrone Ergebnis vorliegt, setzt man dieses mit  complete() in das  CompletableFuture .  Stellt man fest, dass beim Ermitteln des Ergebnisses ein Fehler aufgetreten ist, so kann man diesen mit  completeExceptionally() (passende Exception als Parameter) alternativ signalisieren.
 
 

In der Praxis wird es unter Umständen nicht immer so einfach gehen.  Deshalb gibt es auch noch die Methoden  obtrudeValue() obtrudeExceptionally() .  Wir wollen aber nicht weiter auf die Details eingehen, da man sich als Bibliotheksentwickler ohnehin selbst ein bisschen mehr in das Thema einarbeiten muss. 

Zusammenfassung

Auf den ersten Blick mag das  CompletableFuture nur wie eine Klasse aussehen, die eine Alternative zum  Future aus Java 5 darstellt.  Aber eigentlich ist es deutlich mehr.  Zum einen ist es möglich, im fluent Programmierstil auf asynchrone Ereignisse zu warten.  Zum anderen kann man damit Programme in eine Abfolge von asynchronen und/oder synchronen Verarbeitungsschritten zu strukturieren.  Es bedarf sicher einer gewissen Gewöhnung und Übung mit diesem neuen Paradigma, aber wie man vielleicht schon in diesem Artikel gesehen hat, ergibt sich durch die Benutzung des  CompletableFuture s ein deutlich höheres Abstraktionsniveau.  Statt sich über die Synchronisation von asynchronen Arbeitsschritten selbst Gedanken machen zu müssen, kann man sie relativ einfach und klar strukturiert in einer Pipeline von Tasks implementieren.

Literaturverweise   

/FJP/ 
Javadoc vom ForkJoinPool
URL: http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html
/KRE/      
Übersicht über das Stream API in Java 8
Klaus Kreft & Angelika Langer, Java Magazin, Mai 2014
URL: http://www.angelikalanger.com/Articles/EffectiveJava/74.Java8.Streams-Overview/74.Java8.Streams-Overview.html
/LEA1/  
CompletableFuture, Doug Lea
URL: http://cs.oswego.edu/pipermail/concurrency-interest/2012-December/010423.html   
/LEA2/
CompletionStage, Doug Lea
URL: h ttp://cs.oswego.edu/pipermail/concurrency-interest/2013-July/011496.html
/LEA3/   
Mixing Checked Exceptions and Lambdas Is Too Nightmarish, Doug Lea


URL: http://cs.oswego.edu/pipermail/concurrency-interest/2012-December/010486.html

/TCF/
Javadoc von CompletionState.toCompletableFuture()


URL: http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html#toCompletableFuture--  

Die gesamte  Serie über Java 8:

/JAV8-0/ Neue Features in Java 8 - Überblick
Klaus Kreft & Angelika Langer, Java Magazin, März 2014
URL: http://www.angelikalanger.com/Articles/EffectiveJava/73.Java8.Overview/73.Java8.Overview.html
/JAV8-1/ Funktionale Programmierung in Java
Klaus Kreft & Angelika Langer, Java Magazin, September 2013
URL: http://www.angelikalanger.com/Articles/EffectiveJava/70.Java8.FunctionalProg/70.Java8.FunctionalProg.html
/JAV8-2/ Lambda-Ausdrücke und Methoden-Referenzen
Klaus Kreft & Angelika Langer, Java Magazin, Dezember 2013
URL: http://www.angelikalanger.com/Articles/EffectiveJava/71.Java8.Lambdas/71.Java8.Lambdas.html
/JAV8-3/ Default-Methoden und statische Methoden in Interfaces
Klaus Kreft & Angelika Langer, Java Magazin, Februar 2014
URL: http://www.angelikalanger.com/Articles/EffectiveJava/72.Java8.DefaultMethods/72.Java8.DefaultMethods.html
/JAV8-4/ Übersicht über das Stream API in Java 8
Klaus Kreft & Angelika Langer, Java Magazin, Mai 2014
URL: http://www.angelikalanger.com/Articles/EffectiveJava/74.Java8.Streams-Overview/74.Java8.Streams-Overview.html
/JAV8-5/ Stream-Erzeugung und Stream-Operationen
Klaus Kreft & Angelika Langer, Java Magazin, Juli 2014
URL: http://www.angelikalanger.com/Articles/EffectiveJava/75.Java8.Fundamental-Stream-Operations/75.Java8.Fundamental-Stream-Operations.html
/JAV8-6/ Stream-Kollektoren und die Stream-Operation collect()
Klaus Kreft & Angelika Langer, Java Magazin, September 2014
URL: http://www.angelikalanger.com/Articles/EffectiveJava/76.Java8.Stream-Collectors/76.Java8.Stream-Collectors.html
/JAV8-7/ Stateful Lambdas - Regeln für die Seiteneffekte in Lambda-Ausdrücken, die an Stream-Operationen übergeben werden
Klaus Kreft & Angelika Langer, Java Magazin, November 2014
URL: http://www.angelikalanger.com/Articles/EffectiveJava/77.Java8.Streams-and-Statefulness/77.Java8.Streams-and-Statefulness.html
/JAV8-8/ Das Date/Time API
Klaus Kreft & Angelika Langer, Java Magazin, Januar 2015
URL: http://www.angelikalanger.com/Articles/EffectiveJava/78.Java8.Date-Time-API/78.Java8.Date-Time-API.html
/JAV8-9/ CompletableFuture
Klaus Kreft & Angelika Langer, Java Magazin, März 2015
URL: http://www.angelikalanger.com/Articles/EffectiveJava/79.Java8.CompletableFuture/79.Java8.CompletableFuture.html
/JAV8-10/ Optional<T>
Klaus Kreft & Angelika Langer, Java Magazin, Mai 2015
URL: http://www.angelikalanger.com/Articles/EffectiveJava/80.Java8.Optional-Result/80.Java8.Optional-Result.html

 
 

If you are interested to hear more about this and related topics you might want to check out the following seminar:
Seminar
Lambdas & Streams - Java 8 Language Features and Stream API & Internals
3 day seminar ( open enrollment and on-site)
Java 8 - Lambdas & Stream, New Concurrency Utilities, Date/Time API
4 day seminar ( open enrollment and on-site)
Effective Java - Advanced Java Programming Idioms 
4 day seminar ( open enrollment and on-site)
 
Related Reading
Lambda & Streams Tutorial & Reference
In-Depth Coverage of all aspects of lambdas & streams
Lambdas in Java 8
Conference Presentation at JFokus 2012 (slides)
Lambdas in Java 8
Conference Presentation at JavaZone 2012 (video)
 

 
  © Copyright 1995-2016 by Angelika Langer.  All Rights Reserved.    URL: < http://www.AngelikaLanger.com/Articles/EffectiveJava/79.Java8.CompletableFuture/79.Java8.CompletableFuture.html  last update: 2 Aug 2016