|
|||||||||||||||||||
HOME | COURSES | TALKS | ARTICLES | GENERICS | LAMBDAS | IOSTREAMS | ABOUT | CONTACT | | | | |||||||||||||||||||
|
Effective Java
|
||||||||||||||||||
In unserer Reihe über das Stream API von Java 8 haben wir bereits erläutert, was Streams sind (siehe / KRE4 /), wie man sie erzeugt und welche Operationen sie haben (siehe / KRE5 /). Wir haben gesehen, dass jeder Stream-Operation in der Regel eine Funktion übergeben wird, die wir üblicherweise als Lambda-Ausdruck oder Methoden-Referenz ausdrücken. Diese Funktionen müssen gewissen Bedingungen bezüglich ihrer Seiteneffekte genügen. In diesem Artikel wollen wir uns ansehen, welche Anforderungen es sind und warum sie existieren. Seiteneffekte
Lambda-Ausdrücke in Java, wie auch das
grundsätzliche Programmierparadigma der Streams, sind stark von den Ideen
der Funktionalen Programmierung abgeleitet. Ein wichtiges Grundprinzip
der Funktionalen Programmierung ist (sehr vereinfacht ausgedrückt): Seiteneffekte
sind schlecht und deshalb weitestgehend unmöglich, oder falls möglich,
doch zu vermeiden. Ein solcher Ansatz (seiteneffektfreie Programmierung)
passt nicht so ganz zu Java als objektorientierte Programmiersprache.
So hat Java zum Beispiel kein explizites Konzept für unveränderliche
Typen und mit
final
hat es auch nur ein
sehr schwaches Konzept für unveränderliche Variablen.
Was blieb also anderes, als einen pragmatischen
Kompromiss zu suchen. Das bedeutet im Fall von Lambda-Ausdrücken (und
Methoden Referenzen), die an Stream-Operationen übergeben werden: Manche
Seiteneffekte sind zulässig. Manche Seiteneffekte sind ganz verboten.
Manche Steiteneffekte sind zwar zulässig, aber wenn möglich zu vermeiden.
Zum Teil hängt dies auch noch von der spezifischen Stream-Operation ab.
Schauen wir uns die Details an. Die Seiteneffekte,
die verboten sind, werden in der Javadoc der Stream-Operation bei der Beschreibung
des Funktions-Parameters aufgelistet. Zum Beispiel findet man in der
Javadoc der Methode
f
ilter()
zum Parameter
predicate
folgende Erläuterung:
predicate
-
a
non-interfering, stateless predicate to apply to each element to determine
if it should be included
Die entscheidenden Worte sind non-interfering und stateless . Sie beschreiben die verbotenen Seiteneffekte. Schauen wir sie uns genauer an. Fangen wir dabei mit Statelessness an. Statelessness
Die Statelessness (Zustandslosigkeit) ist
eine Anforderung, die von den meisten Funktionen verlangt wird, die an
Stream-Operationen übergeben werden. Ausgenommen sind nur die Funktionen,
die an
forEach
()
,
forEachOrdered
()
und
peek
()
übergeben werden.
Zustandslosigkeit heißt, dass die Ausführungen
der Funktion auf den verschiedenen Sequenz-Elementen nicht voneinander
abhängig sein dürfen. Insbesondere dürfen die Funktionen keine Daten
akkumulieren oder modifizieren.
Hier sind Beisp
iele
für zustandslose Funktionen, die die Statelessness-Anforderung erfüllen:
List<String> streamSource = …; streamSource.parallelStream() .filter( w->w.length()>0 ) .map( w->w.charAt(0) )
.forEach(
System.out
::
print
);
Das Prädikat, das an die
filter
-Operation
übergeben wird, liest einen String und bestimmt dessen Länge. Das Prädikat
ist zustandslos und hat keinerlei Seiteneffekte. Man kann es beliebig
oft und in beliebiger Reihenfolge auf die Sequenz-Elemente anwenden; es
kommt für jedes Sequenz-Element immer seine Länge heraus. Gleiches
gilt für das Mapping, das der
map
-Operation
übergeben wird. Es kommt immer der Anfangsbuchstabe des jeweiligen Strings
heraus. Die Funktion, die an die
forEach
-Operation
übergeben wird, hat hingegen Seiteneffekte: sie gibt das Zeichen auf
System.out
aus. Jetzt ist nicht mehr egal, wie oft und in welcher Reihenfolge die
Funktion ausgeführt wird; es wirkt sich auf Ausgabe aus, die auf
System.out
erscheint. Die Ausführungen der
print
-Methode
auf den verschiedenen Stream-Elementen sind jedoch nicht voneinander abhängig.
Deshalb ist der Seiteneffekt der
print
-Methode
vergleichsweise harmlos. Das Problem mit Seiteneffekten und der Verletzung
der Statelessness-Anforderung ist, dass sie nicht immer harmlos sind, sondern
im Gegenteil sehr leicht zu Fehlern führen können.
Hier ist ein Beispiel für eine zustandsbehaftete
Funktion, die die Statelessness-Anforderung verletzt:
List<String> streamSource = …; Set<String> wordsAlreadySeen = new HashSet<>(); streamSource.stream() .filter(w-> {if( wordsAlreadySeen .contains(w)) { return false; } else { wordsAlreadySeen .add(w); return true; })
.forEach(System.out::println);
Aus einer Sequenz von Strings sollen alle
Duplikate entfernt werden. Dafür wird ein Set zur Hilfe genommen, in
dem eingetragen wird, welche Strings schon vorgekommen sind. Nur diejenigen
Strings, die noch nicht aufgetreten sind, werden in den Downstream weitergegeben.
Das gezeigte Prädikat ist ein Beispiel für die Verletzung der Statelessness-Anforderung,
denn es hängt vom Zustand des Sets ab, welches Ergebnis das Prädikat
für ein bestimmtes Sequenz-Element liefert.
Klarerweise ist die Modifikation des Sets
ein Seiteneffekt und dieses Mal ist es kein harmloser Seiteneffekt. Erstens
ist ein
HashSet
nicht thread-safe und
zweitens hat die check-and-react-Sequenz ("wenn enthalten, dann eliminieren,
sonst weitergeben") bei paralleler Ausführung eine Race Condition: in
dem Moment, wo das Prädikat
true
zurückgibt,
stimmt u.U. die Bedingung "ist nicht enthalten" schon nicht mehr, trotzdem
wird
true
zurück gegeben und so können
dann sporadisch Duplikate im Downstream auftauchen.
Das sind subtile Fehler, die bei paralleler
Ausführung der
filter
-Operation auftreten.
Wie in den vorhergehenden Artikeln schon erwähnt, sollte man sich grundsätzlich
bemühen, Stream-Operationen so zu nutzen, dass sie für sequentielle und
parallele Streams gleichermaßen funktionieren. Immerhin könnten Kollegen
auf die Idee kommen,
parallelStream()
statt
stream()
auf der
streamSource
aufzurufen, weil sie sich von der Parallelisierung eine Performancesteigerung
versprechen. Aus diesem Grund ist die Statelessness grundsätzlich gefordert,
auch wenn die Probleme bzw. Fehler bei Nutzung eines sequentiellen Streams
(wie in dem Beispiel oben) noch nicht auftreten.
Oftmals stellt sich bei näherem Hinsehen
heraus, dass es gar nicht nötig ist, Funktionen mit Seiteneffekten zu
verwenden (wie das Prädikat in unserem Beispiel oben). Um Duplikate
aus einem Stream zu entfernen, gibt es die Stream-Operation
distinct
.
Wir können das obige Beispiel alternativ so implementieren:
List<String> streamSource = …; streamSource.parallelStream() . distinct ()
.forEach(System.out::println);
Auf diese Weise ist es viel einfacher und
sicherer. Eine Schwierigkeit im Umgang mit den Streams ist sicherlich,
dass man u.U. nicht alle Möglichkeiten kennt, die das Stream-API bietet.
Deshalb werden zustandsbehaftete Funktionen verwendet, obwohl es vielleicht
gar nicht nötig ist.
Kommen wir noch mal zu den Stream-Operationen,
die keine Zustandslosigkeit von ihren Funktionsparametern fordern. Dies
sind die den Stream-Operationen
peek()
und
forEach()
/
forEachOrdered()
.
peek()
ist eine
intermediären Stream-Operation, die verwendet wird, um Zwischenergebnisse
mitzutracen. Zum Beispiel könnten wir unser erstes Beispiel so abwandeln:
List<String> streamSource = …; streamSource.parallelStream() .filter(w->w.length()>0) . peek(w-> trace . trace ( "word passed: "+w )) .map(w->w.charAt(0))
.forEach(System.out::print);
Jetzt tracen wir mit, welche Worte durch
filter()
durchgehen und bei
map()
ankommen.
Grundsätzlich ist es möglich, dass die trace-Funktionalität stateful
ist. Zum Beispiel könnten die Trace-Meldungen client-seitig gepuffert
werden, bevor sie an den Trace-Server gesandt werden. Deshalb wird für
peek()
keine Zustandslosigkeit gefordert. Wichtig ist dabei aber folgende Bemerkung
aus der Javadoc von
peek()
:
If
the action modifies shared state, it is responsible for providing the required
synchronization.
Das heißt, falls der an
peek()
übergebene Lambda-Ausdruck einen Zustand hat, der verändert werden kann,
so ist man selbst dafür verantwortlich, diesen Zustand thead-sicher zu
machen.
Die Situation bei
forEach()
/
forEachOrdered()
ist im Prinzip die gleiche wie bei
peek()
.
Der übergebene Lambda-Ausdruck darf Zustand haben und man ist selbst für
die Thread-Sicherheit des Zustands verantwortlich. Schauen wir uns dazu
noch ein Beispiel aus dem letzten Artikel (/
KRE6
/)
an:
List<String> streamSource = …; StringBuffer sb = new StringBuffer() ; streamSource. parallelStream() .forEach (s -> sb.append(s) );
String resultString =
sb.toString();
Hiermit sammeln wir die Strings aus der
streamSource
in einem
StringBuffer
auf. Dieser
StringBuff
er
repräsentiert den veränderbaren Zustands des Lambda-Ausdrucks. Da die
append()
Methode des
StringBuffers
synchronized
ist, ist der konkurrierende Zugriff auf den
StringBuffer
thread-sicher. Trotzdem sollte man grundsätzlich auch bei
forEach()
im Fall von stateful Lambda-Ausdrücken kritisch sein und sich fragen,
ob es nicht eine bessere Lösung gibt. Wie wir im letzen Artikel ausführlich
diskutiert haben, gibt es eine performantere Lösung. Sie sieht folgendermaßen
aus:
List<String> streamSource = …; String resultString = streamSource.parallelStream()
.collect(Collectors.joining())
;
Bei einem Benchmark auf einem Zwei-Core-Prozessor mit einem parallelen Stream, der 20.000 Elemente enthielt, war diese Lösung mit dem joining -Kollektor etwa 5,2 mal schneller als die vorhergehende mit dem StringBuffer . Die explizite Synchronisation des Zustands bremst die Lösung mit dem StringBuffer aus. Man sieht also, selbst wenn (veränderlicher) Zustand zulässig ist, sollt man ihn nur nutzen, wenn man ansonsten keine bessere Lösung findet. Non-Interference
Erinnern wir uns noch mal an den Anfang
dieses Artikels. Von einem
predicate
,
das an die
f
ilter()
Methode übergeben wird, fordert die Javadoc, dass es neben
stateless
auch
non-interfering
ist. Diese Non-Interference-Anforderung (man
könnte es vielleicht als "Nichteinmischungsgebot" übersetzen) wird generell
an allen Funktionen gestellt, die an Stream-Operationen übergeben werden.
Die Non-Interference-Anforderung bedeutet, dass die betreffende Funktion
keine Modifikationen an der unterliegenden Datenquelle ausführen darf.
Zur Erinnerung: in /
KRE4
/
hatten wir erläutert, dass Streams keine Datenspeicher sind, sondern lediglich
auf Datenspeicher (Collections, Arrays, etc.) verweisen. Die Non-Interference-Anforderung
bezieht sich auf diesen unterliegenden Datenspeicher (die sogenannte
Stream
Source
). Funktionen, die an Stream-Operationen übergeben werden
(und von denen Non-Interference verlangt wird), dürfen die unterliegende
Datenquelle nicht verändern.
Hier ist ein Beispiel für eine Funktion,
die die Non-Interference-Anforderung verletzt:
List<Integer> underlyingList = new ArrayList<>(Arrays.asList(1,2,3,4,5,6,7,8,9)); underlyingList .stream() .map(i->2*i)
.forEach(
underlyingList
::add);
// <=
ConcurrentModificationException
Der
forEach
-Operation
wird als Consumer die Methoden-Referenz
underlyingList::add
übergeben. Die
add
-Methode verletzt
ganz offensichtlich die Non-Interference-Anforderung, denn sie versucht,
die Ergebnisse des vorangegangenen Mappings in die unterliegende Liste
einzufügen. Der gezeigte Beispielcode scheitert beim Ablauf mit einer
ConcurrentModificationException
.
Die Exception ist nicht verwunderlich: wir modifizieren die Liste während
der Iteration über die Listenelemente; das führt auch bei Imperativer
Programmierung und Benutzung einer
for
-Schleife
zu einer
ConcurrentModificationException
.
Die Exception ist Teil des Fail-Fast-Verhaltens des Collection-Iterators.
Das Auftreten der
ConcurrentModificationException
ist aber nur einer der möglichen Effekte, die bei Verletzung der Non-Interference-Anforderung
auftreten können. Es sind auch andere Fehler (z.B. inkonsistente Objekte,
seltsame Ergebnisse, usw.) denkbar.
Hier sind zur Illustration noch weitere Beispiele:
List<Integer> unmodifiableStreamSource = Collections.unmodifiableList(new ArrayList<>(Arrays.asList(4, 5, 6, 7, 8, 9, 0))); unmodifiable StreamSource .stream() .map(i->2*i) .forEach( unmodifiable StreamSource ::add); // <= UnsupportedOperationException
List<Integer> synchronizedStreamSource = Collections.synchronizedList(new ArrayList<>(Arrays.asList(4, 5, 6, 7, 8, 9, 0))); synchronizedStreamSource .stream() .map(i->2*i)
.forEach(
synchronizedStreamSource
::add);
// <=
ConcurrentModificationException
Eine unveränderliche Collection verkraftet
selbstverständlich überhaupt keine Modifikationen; der Versuch einer
Modifikation scheitert mit einer
UnsupportedOperationException
.
Wie man sieht, genügt auch Thread-Sicherheit nicht: eine
synchronizedList
ist zwar thread-sicher, verkraftet aber trotzdem keine konkurrierenden
Modifikationen während der Iteration.
Die Fehlerindikation muss aber nicht unbedingt
immer so offensichtlich sein, wie in den bisherigen Beispielen. Auf dem
Workshop unserer Lambda-Tour im letzten Jahr in Hamburg wollten die Teilnehmer
sehen, wie sich ein Verstoß gegen die Non-Interference bei Nutzung des
Key-Sets auswirkt, wenn die Stream-Source eine
Concurrent
Hash
Map
ist. Die Frage war: Was macht der folgende Code?
ConcurrentHashMap<Integer,Integer> chm = new ConcurrentHashMap<>(); chm.put(4,4);
chm.put(5,5);
chm.keySet().stream() .map(i->2*i)
.forEach(k->chm.put(k,k));
// Verstoß gegen Non-Interference
System.out.println(chm);
Kommt eine Exception? Werden die Werte
einmal (also für 8 und 10) neu eingefügt? Oder läuft der Code für
immer, weil immer wieder neue Werte in die Map kommen? Nichts von alledem.
Die Ausgabe ist:
{16=16,
4=4, 20=20, 5=5, 8=8, 10=10}
Der Iterator sieht die ersten beiden Werte,
die neu eingefügt werden (8 und 10). Daraus werden wieder neue Werte
erzeugt (16 und 20), die der Iterator dann aber nicht mehr zu sehen bekommt.
Das Ergebnis ist also ziemlich zufällig. Mit anderen Zahlen, anderer
Größe der
HashMap
, usw. kann der Ablauf
ganz anders sein. Dies zeigt, dass, selbst wenn die unterliegende Datenstruktur
robust genug ist, die Non-Interference-Anforderung Sinn macht, weil das
Ergebnis sonst nicht deterministisch ist.
Was kann man tun, um die Interference in
unserem Beispiel zu vermeiden. Schauen wir uns dazu noch mal das Ausgangsbeispiel
ab. Hier kann man zum Beispiel die neu erzeugten Elemente zwischenspeichern
und anschließend an die Stream-Source anhängen:
List<Integer> underlyingList = new ArrayList<>(Arrays.asList(1,2,3,4,5,6,7,8,9)); List<Integer> resultList = underlyingList.stream() .map(i->2*i) . collect (Collectors.toList());
underlyingList.
addAll
(resultList);
Die Ergebnisse des Mappings werden in einer
Liste abgelegt, die anschließend an die unterliegende Liste angehängt
wird.
Modifikation der Sequenz-Elemente
Die Non-Interference-Anforderung bezieht
sich erst einmal nur auf Modifikationen an der unterliegenden Datenquelle,
d.h. es dürfen keine Elemente zur Datenquelle hinzugefügt oder entfernt
werden, während eine Stream-Operation läuft. Nun stellt sich die Frage,
ob die Funktionen, die an die Stream-Operationen als Argumente übergeben
werden, Modifikationen an den Elementen der unterliegenden Datenquelle
ausführen dürfen.
Grundsätzlich ist es keine gute Idee, wenn
zum Beispiel das Prädikat einer Filter-Operation die Elemente der unterliegenden
Datenquelle ändert, auch wenn dieser Seiteneffekt nicht explizit in der
Javadoc verboten ist. Das gilt auch für alle anderen Stream-Operationen
mit Ausnahme von
forEach(
)
/
forEachOrdered()
.
Diese beiden Operationen produzieren nichts, denn ihr Returntyp ist
void
.
Ihre ganze Funktionalität besteht darin, Seiteneffekte zu erzeugen.
Ein solcher Seiteneffekt kann genau darin bestehen, die Stream-Elemente
zu ändern. So kann man zum Beispiel den folgenden Code nutzen, um in
einer Sequenz von Punkten (vom Typ
Point
)
die Koordinaten alle Punkte in der
streamSource
zu verschieben:
Collection<Point> streamSource = …; streamSource.stream()
.forEach(p->p.translate(1,-1));
Dabei sollte man sich überlegt haben, warum
man es macht. Will man die Punkte in der Collection wirklich verändern
oder werden die veränderten Punkte nur erzeugt, um sie weiterzuverarbeiten?
Wenn letzteres der Fall ist, kann man die Stream-Source unverändert lassen.
Stattdessen erzeugt man einfach neue veränderte Punkte durch ein Mapping
und verwendet diese neuen Punkte weiter. Das sieht dann so aus:
Collection<Point> streamSource = …; streamSource.stream() . map(p->new Point(p.x+1,p.y-1))
.forEach(System.out::print);
Natürlich ist es weniger performant, weil
alle Punkte neu erzeugt werden müssen. Dafür sind die Punkte in der
Orignal-Collection unverändert geblieben. Welche der Lösung vorzuziehen
ist, ist von Fall zu Fall zu entscheiden.
Ein Verändern der Punkte in der Collection kann aber auch aus einem anderen Grunde kritisch sein. Solange die unterliegende Datenquelle eine Liste oder ein Array ist, ist es unproblematisch. Wenn die unterliegende Collection aber ein Set ist, dann werden die internen Datenstrukturen des Sets durch die Anwendung der translate -Methode auf die Set-Elemente zerstört. Das ist ein Effekt, der bei allen Collections auftritt, bei denen die Anordnung der Elemente in ihren internen Datenstrukturen (sortierter Binärbaum, Hash-Buckets, etc.) vom Inhalt der Elemente abhängt. Dieses Problem hat genaugenommen nichts mit Streams und Lambda-Ausdrücken zu tun. Es tritt genauso bei Imperativer Programmierung und Benutzung einer for -Schleife auf. Zusammenfassung
Wir haben uns in diesem Beitrag detailliert
angesehen, welche Anforderungen bezüglich Seiteneffekten an die Funktionen
gestellt werden, die wir an Stream-Operationen übergeben. Es werden
Non-Interference und Statelessness verlangt. Selbst wenn Statelessness
nicht ausdrücklich verlangt wird, ist es ratsam, auf Seiteneffekte zu
verzichten, weil der Code dadurch robuster und weniger fehleranfällig
wird.
Wir beenden hiermit vorläufig unsere Reihe
über die Streams in Java, um später noch einmal auf parallele Streams
zurück zu kommen. In den nächsten Beiträgen zum Thema "Java 8 Neuerungen"
wollen wir uns dem Date/Time-API und den Erweiterungen bei den Concurrency
Utilities zuwenden
.
Die gesamte Serie über Java 8:
|
|||||||||||||||||||
© Copyright 1995-2018 by Angelika Langer. All Rights Reserved. URL: < http://www.AngelikaLanger.com/Articles/EffectiveJava/77.Java8.Streams-and-Statefulness/77.Java8.Streams-and-Statefulness.html> last update: 26 Oct 2018 |