Angelika Langer - Training & Consulting
HOME | COURSES | TALKS | ARTICLES | GENERICS | LAMBDAS | IOSTREAMS | ABOUT | CONTACT | Twitter | Lanyrd | Linkedin
 
HOME 

  OVERVIEW

  BY TOPIC
    JAVA
    C++

  BY COLUMN
    EFFECTIVE JAVA
    EFFECTIVE STDLIB

  BY MAGAZINE
    JAVA MAGAZIN
    JAVA SPEKTRUM
    JAVA WORLD
    JAVA SOLUTIONS
    JAVA PRO
    C++ REPORT
    CUJ
    OTHER
 

GENERICS 
LAMBDAS 
IOSTREAMS 
ABOUT 
CONTACT 
Effective Java

Effective Java
Java 8
Benutzer-definierte Stream-Kollektoren
 

Java Magazin, Januar 2016
Klaus Kreft & Angelika Langer

Dies ist die Überarbeitung eines Manuskripts für einen Artikel, der im Rahmen einer Kolumne mit dem Titel "Effective Java" im Java Magazin erschienen ist.  Die übrigen Artikel dieser Serie sind ebenfalls verfügbar ( click here ).

 
 

Im letzten Beitrag unserer Serie über Lambdas & Streams in Java (/KLRC/) haben wir die Stream-Operationen  reduce() und  collect() verglichen.  Dabei haben wir  collect() zusammen mit dem  StringBuilder als Zielcontainer benutzt.  Diesmal wollen wir uns ansehen, wie mächtig die Funktionalität von collect() wird, wenn man sie mit einem selbstdefinierten Zielcontainer-Typ kombiniert. 

Zur Erinnerung: wie funktioniert die  collect() -Operation?

Die Stream-Operation  collect() ist eine terminale Operation, die alle Elemente des Input-Streams in einem Zielcontainer aufsammelt.  Wir haben uns das letzte Mal (/KLRC/) angesehen, dass man die Variante

 
 

R               collect(Supplier<R> supplier, BiConsumer<R,? super T> accumulator, BiConsumer<R,R> combiner)
 
 

nutzen kann, um beispielsweise die Elemente eines  Stream<String> zu konkatenieren.  Als Zielcontainer von  collect() wird dabei ein  StringBuilder benutzt, der nach dem  collect() wieder mit  toString() in einen String konvertiert wird.  Die Implementierung sieht dann so aus:
 
 

String s = IntStream.range(0, 8)  //.parallel()

                    .mapToObj(Integer::toString)

                    .collect(() -> new StringBuilder(),

                            (StringBuilder sb1, String s1) -> sb1.append(s1),

                             (StringBuilder sb1, StringBuilder sb2) -> sb1.append(sb2))

                    .toString();
 
 

  System.out.println(s);
 
 

Der Operation  collect() werden dabei drei Lambda Ausdrücke übergeben:

supplier , der implementiert, wie ein Objekt des Zielcontainers ( StringBuilder ) erzeugt wird,

accumulator ,  der implementiert, wie ein Element des Streams ( String ) in den Zielcontainer akkumuliert wird,

combiner , der implementiert, wie zwei Zielcontainer ( StringBuilder ) zusammen kombiniert werden.
 
 

Bei einem sequentiellen Stream wird der  supplier von  collect() dazu benutzt, ein Zielcontainer-Objekt zu konstruieren.  Danach wird jedes Stream-Element mit Hilfe des  accumulator s in dem Zielcontainer-Objekt aufgesammelt.   Der  combiner wird nicht gebraucht.
 
 

Bei einem parallelen Stream (siehe /KLPS/) wird die Stream-Source in der Fork-Phase in Segmente aufgeteilt und für jedes Segment wird eine Task erzeugt; diese Tasks werden anschließend in der Execution-Phase parallel ausgeführt.  Bei der Ausführung konstruiert jede Task mit Hilfe des  supplier s ein eigenes Zielcontainer-Objekt und verwendet den  accumulator , um die zur Task gehörenden Stream-Elemente in diesem Zielcontainer-Objekt aufzusammeln. Danach werden in der Join-Phase die Zielcontainer-Objekte aller Tasks mit dem  combiner zum Gesamtergebnis zusammengeführt.
 
 

Bemerkenswert ist dabei die Tatsache, dass der Zielcontainer ein  StringBuilder ist, der verändert wird - sowohl vom  accumulator als auch vom  combiner .  In der Javadoc der  collect() -Operation wird deshalb von einer mutable reduction operation gesprochen.  Das verändernde Verhalten der  collect() -Operation ist deshalb ungewöhnlich, weil das Vorbild für die  Stream -Abstraktion in Java aus funktionalen Programmiersprachen stammt und dort Sequenz (Englisch: sequence ) genannt wird.  Sequenzen in funktionalen Sprachen haben aber typischerweise keine verändernden Operationen.  Bei den Streams in Java ist es anders, denn für Java reicht es nicht aus, nur unveränderliche Operationen im  Stream -Interface anzubieten, weil Java eine objektorientierte Programmiersprache mit veränderlichen Typen ist.  Dann kann es natürlich auch veränderliche Zielcontainer geben und genau dafür ist die collect() -Operation gedacht.  Wir haben uns das verändernde Verhalten von  collect() ausführlich beim letzen Mal angesehen, als wir dem  collect() mit  StringBuilder den  reduce() mit  String gegenübergestellt haben.  Die veränderliche Reduktion mit  collect() und  StringBuilder ist deutlich performanter: wir haben in einem Benchmark 2000  String s aus einem sequentiellen Stream konkateniert und festgestellt, dass  collect() dabei rund 45-mal schneller ist als  reduce() .  Es gibt also gute Gründe für die Existenz einer mutable reduction operation in Java-Streams .
 
 

collect()   und Collector

Da es in diesem Artikel darum gehen soll,  collect() mit einem selbstdefinierten Zielcontainer-Typ zu nutzen, wollen wir uns vorab ansehen, welche Möglichkeiten es grundsätzlich gibt, um einen Zielcontainer-Typ im  collect() zu verwenden.  Wir haben uns bisher nur diese  collect() -Variante aus dem  Stream -Interface angesehen:

 
 

R               collect(Supplier<R> supplier, BiConsumer<R,? super T> accumulator, BiConsumer<R,R> combiner)
 
 

Die funktionalen Parameter  supplier accumulator und  combiner werden an  collect() übergeben und in der Operation an geeigneter Stelle in der Implementierung genutzt.  Genau das haben wir uns bereits oben in dem Beispiel angesehen.
 
 

Alternativ gibt es noch eine weitere  collect() -Operation im  Stream -Interface:
 
 

<R,A> R collect(Collector<? super T,A,R> collector)
 
 

Diese Methode hat nur einen Parameter: den  collector vom Typ  Collector .  Vordefinierte Kollektoren (oder genauer gesagt: statische Factory-Methoden, die vordefinierte Kollektoren erzeugen) findet man in der Klasse  java.util.stream.Collectors .  Dort gibt es zum Beispiel die Factory-Methode  joining() , die im Wesentlichen das Gleiche macht, wie wir oben in unserem Beispiel:  String s (sogar etwas allgemeiner  CharacterSequence s) konkatenieren.  Das heißt, wenn man in der Praxis die Elemente eines  Stream<String> konkatenieren will, wird man es sich leicht machen und den vordefinierten  joining() -Kollektor nutzen, statt die  collect() -Operation mit  supplier accumulator und  combiner zu versorgen.  Wir haben die komplizierte Variante hier im Artikel besprochen, weil sie uns später hilft, das Verständnis dafür zu entwickeln, wie ein eigener Zielcontainer-Typ implementiert werden kann.
 
 

Schauen wir uns nun das  Collector -Interface (in  java.util.stream ) etwas genauer an.  Die abstrakten Methoden sind folgende:
 
 

public interface Collector<T, A, R> {
 
 

    Supplier<A> supplier();
 
 

    BiConsumer<A, T> accumulator();
 
 

    BinaryOperator<A> combiner();
 
 

    Function<A, R> finisher();
 
 

    Set<Characteristics> characteristics();
 
 

    …

}
 
 

Die ersten drei Methoden ( supplier accumulator und  combiner ) kennen wir bereits von der  collect() -Operation mit drei funktionalen Parametern. 
 
 

Der  finisher enthält die Funktionalität, die am Ende, wenn alle Elemente des Streams eingesammelt worden sind, auf den Zielcontainer angewendet wird.  In unserem vorhergehenden Beispiel könnte der  finisher das  toString() sein, welches aus dem  StringBuilder einen  String macht, so dass das Ergebnis von  collect() ein  String ist. 
 
 

Die Methode  characteristics liefert einen Set von  Characteristics zurück. Dabei ist  Characteristics ein Enum-Typ, der aus den Werten  CONCURRENT UNORDERED , und IDENTITY_FINISH besteht.  Die  Characteristics sagen der  collect() -Operation, wie sie die Funktionalität des  Collector s nutzen kann: 

IDENTITY_FINISH bedeutet, dass der  Collector lediglich einen trivialen  finisher hat und das Zielcontainer-Objekt schon das Ergebnis des  collect() ist.   Das heißt, der  collect() braucht den  finsisher am Ende gar nicht aufrufen.

CONCURRENT bedeutet, dass der  accumulator thread-safe ist und konkurrierend vom  collect() aufgerufen werden kann.

UNORDERED bedeutet, dass der  Collector bei einem parallelen Stream die Elemente nicht in der Reihenfolge, in der sie im Stream vorkommen, einsammeln kann.  Das kann zum Beispiel der Fall sein, wenn der  Collector CONCURRENT ist und der  accumulator konkurrierend ausgeführt wird.
 
 

Das ist grob die Beschreibung der  Characteristics -Werte.  Die  collect() -Variante mit  supplier accumulator und  combiner als Parameter entspricht der  collect() -Variante mit  Collector , bei dem die Methode  characteristics() wie folgt implementiert ist:

public Set<Characteristics> characteristics() 
           { return Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH));  }


 
 

Wir werden  CONCURRENT und  UNORDERED in diesem Artikel gar nicht benutzen.  Falls Interesse besteht, im Detail zu sehen, wie diese beiden  Characteristics den  collect() beeinflussen, kann man sich einen der von  toConcurrentMap() bzw.  groupingByConcurrent() erzeugten Kollektoren genauer ansehen.
 
 

Eigene Kollektoren

Der offensichtliche Ansatz, um einen eigenen Kollektor zu implementieren, besteht natürlich darin, eine Klasse zu implementieren, die vom Interface  Collector ableitet und dabei die abstrakten Methoden überschreibt.  Alternativ gibt es eine zweite Möglichkeit:  im Interface  Collector gibt es zwei überladene Versionen der statischen Factory-Methode  of() .  Man übergibt als Parameter die Funktionalität von  supplier accumulator combiner und optional  fin ish er , sowie die  Characteristics des Collectors.  Die Signaturen der beiden  of() -Varianten (ohne bzw. mit  finish er ) sind:

 
 

Collector<T, R, R> of(Supplier<R> supplier, BiConsumer<R, T> accumulator,

                      BinaryOperator<R> combiner, Characteristics... characteristics)
 
 

Collector< T A R > of(Supplier< A > supplier, BiConsumer< A T > accumulator,
                      BinaryOperator< A > combiner, Function< A R > finisher,
                      Characteristics... characteristics)
 
 

Das folgende Beispiel zeigt die Benutzung der  of() -Methode zur Implementierung eines  Collector s für die  String -Konkatenation:
 
 

String s = IntStream.range(0, 8)

             .mapToObj(Integer::toString)

             .collect(Collector.of(() -> new StringBuilder(),

                                   (StringBuilder sb1, String s1) -> sb1.append(s1),

                                   (StringBuilder sb1, StringBuilder sb2) -> sb1.append(sb2),

                                   (StringBuilder sb) -> sb.toString()));
 
 

System.out.println(s);
 
 

Das ganze sieht fast genauso aus wie unser erstes Beispiel oben, nur dass wir diesmal noch einen  fin ish er haben, der das Zielcontainer-Objekt vom Typ  StringBuild er gleich in der  collect() -Operation in einen  String konvertiert.   Characteristics haben wir keine, da keiner der Werte auf unseren  Collector zutrifft.  In diesem Beispiel haben wir nun zwar einen eigenen Kollektor definiert, aber als Zielcontainer-Typ noch immer den  StringBuilder aus dem JDK benutzt.
 
 

Bemerkenswert ist ein kleiner Unterschied, der den  combiner betrifft:  bei der  collect() -Operation hat der  combiner einen anderen Typ als beim  Collector , den man mit der  of() -Methode erzeugt.

Der  combiner der  collect() -Operation ist vom Typ  BiConsumer<R,R> .  Dabei ist  R der Typ des Zielcontainers, in unserm Fall also  StringBuilder . Das heißt, der  combiner nimmt zwei  StringBuilder als Parameter und gibt nichts zurück.

Der  combiner des  Collector s ist hingegen vom Typ  BinaryOperator<A> .  Dabei ist  A wieder der Typ des Zielcontainers ( StringBuilder ).  Das heißt, der  combiner des  Collector s nimmt auch zwei  StringBuilder als Parameter, gibt aber auch einen  StringBuilder als Ergebnis zurück.
 
 

Überraschend ist dabei, dass wir in den beiden vorhergehenden Beispielen den  combiner identisch implementiert haben.  In beiden Fällen haben wir diesen Lambda-Ausdruck verwendet:
 
 

( StringBuilder sb1, StringBuilder sb2) -> sb1.append(sb2)
 
 

Es funktioniert, obwohl verschiedene Signaturen verlangt wurden (einmal mit Returntyp  void und einmal mit Returntyp  StringBuilder ).  Es liegt einfach daran, dass der Rumpf unseres Lambdas lediglich aus einem Ausdruck (Englisch: expression) besteht; das heißt, falls ein Returnwert benötigt wird, wird das Ergebnis des Ausdrucks als Returnwert benutzt.  Das Ergebnis ist in diesem Fall glücklicherweise wieder  St r ingBuilder , denn die  append() -Methode des  StringBuilder s liefert  StringBuilder zurück.  Das heißt, je nach Kontext kann unsere  combiner -Implementierung ein  BiConsumer<StringBuilder,StringBuilder> oder ein  BinaryOperation<StringBuilder>  sein.  Dieses Beispiel illustriert ganz nebenbei, dass Lambda-Ausdrücke in Java Poly Expressions sind, deren Typ vom Kontext abhängt (siehe /KLLM/).
 
 

Es stellt sich die Frage, warum die  combiner überhaupt verschiedene Signaturen haben?  Wir haben bereits gesehen, dass der  collect() mit  Collector mehr Flexibilität ermöglicht, als der  collect() mit  supplier accumulator und  combiner : er erlaubt die Benutzung eines  finalizer sowie mit Hilfe der  Characteristics die konkurrierende Accumulation, usw. .  Größere Flexibilität ist auch der Grund für die unterschiedlichen  combiner Signaturen.  Schauen wir uns dazu noch mal die Beispiele für die  String -Konkatenation mit Hilfe des  StringBuilders an.
 
 

Der  combiner des  collect() nimmt zwei  StringBuilder und hängt den Inhalt des zweiten and den ersten an:
 
 

( StringBuilder sb1, StringBuilder sb2) -> sb1.append(sb2)
 
 

Hierbei geht der  collect() davon aus, dass der erste Parameter  sb1 verändert wird, und zwar so dass er danach auch den Inhalt des zweiten Parameters  sb2 enthält.
 
 

An den  combiner des  Collector s werden vom  collect() nicht so restriktive Anforderungen gestellt.  Der  combiner nimmt zwar auch zwei  StringBuilder als Parameter, sein Returnwert ist aber auch wieder ein  StringBuilder , und zwar der aus den beiden kombinierten Input-Parametern resultierende  StringBuilder .  Wir könnten seine Implementierung folgendermaßen optimieren:
 
 

   (StringBuilder sb1, StringBuilder sb2) ->  {

        if (sb1.length () == 0) return sb2;

        else return  sb1.append(sb2) ;

    }
 
 

Falls  sb1 leer ist, geben wir  sb2 zurück, was effizienter ist, als den Inhalt von  sb2 an das leere  sb1 anzuhängen.  In der Praxis kann es auch wirklich vorkommen, dass  sb1 leer ist. Zum Beispiel, wenn vor dem  collect() eine  filter() -Operation durchgeführt worden ist.  Diese Art der Optimierung ist mit dem  combiner des  collect() nicht möglich; hier muss immer der Inhalt des zweiten Parameters an den ersten angehängt werden.

Selbstdefinierter Zielcontainer-Typ

In die Situation, eigene Zielcontainer-Typen bzw. eigene Kollektoren zu implementieren, kommt man dann, wenn die im JDK definierten Typen für die eigenen Aufgaben nicht mehr ausreichen.  Man muss sich gedanklich vielleicht ein wenig von den in  Collectors unterstützten Zielcontainer-Typen wie  StringBuilder ( joining() ) bzw. den JDK-Collection-Typen ( toCollection() toList() toMap() , …) lösen, um zu erkennen, wie viel Potential in diesem Ansatz liegt. 

Ein einfaches Beispiel

Schauen wir uns zuerst ein relativ einfaches Beispiel an.  Nehmen wir an, wir müssten in einen  Stream<String> das kleinste und größte Element bezüglich der lexikographischen Ordnung finden.  Der erste und einfachste Ansatz ist vielleicht, die Methoden  min() und  max() aus dem  Stream -Interface mit der  compareTo() -Methode des Strings als  Comparator zu nutzen.

 
 

Optional<String> min = myStringStream.min(String::compareTo);
 
 

Optional<String> max = myStringStream.max(String::compareTo);
 
 

System.out.println("min: " + min.get() + " - max: " + max.get());
 
 

Der Nachteil dieses Ansatzes ist, dass wir zweimal über die Elemente des Streams gehen müssen, einmal um das Minimum und einmal um das Maximum zu ermitteln.  Hier kann ein selbstdefinierter Zielcontainer-Typ hilfreich sein, der nur einmal über den Stream  geht und dabei beides - das Minimum und das Maximum - auf einmal aufsammelt.  Einen solchen Zielcontainer-Typ wollen wir jetzt bauen; wir nennen ihn  MinMaxStringCollector .
 
 

Schauen wir uns die dafür benötigte Implementierung des selbstdefinierten Zielcontainers  MinMaxStringCollector an.  Die Klasse hat zwei Felder  min und  max , um darin das Minimum bzw. das Maximum zu speichern, und Getter-Methoden für beide Felder, um am Ende auf die Ergebnisse zugreifen zu können.
 
 

public class MinMaxStringCollector {

    private String min;

    private String max;
 
 

    public Optional<String> getMin() { return Optional.ofNullable(min); }
 
 

    public Optional<String> getMax() { return Optional.ofNullable(max); }
 
 

       …

}
 
 

Die Getter-Methoden haben wir aus Kompatibilität zu der  min() - bzw.  max() -Streamoperation so implementiert, dass sie  Optional<String> zurückliefert.  Die Konvertierung mit  Optional.ofNullable() haben wir schon ausführlich in dem vorhergehenden Artikel über  Optional besprochen (/KLOP/). 
 
 

Weiter wollen wir unseren Zielcontainer so implementieren, dass er Methoden enthält, die wir als  supplier accumulator und  combiner an das  collect() übergeben können.  Für den  supplier brauchen wir nicht explizit eine Methode zu implementieren. Wir benutzen dafür den Default-Konstruktor, der vom Compiler generiert wird. 
 
 

Der  accumulator implementiert die Funktionalität, die ausgeführt wird, wenn ein Element des Streams an den Kollektor übergeben wird:
 
 

        public void accumulate(String s) {

            if (min == null) min = s;

            else if (min.compareTo(s) > 0) min = s;
 
 

            if (max == null) max = s;

            else if (max.compareTo(s) < 0) max = s;

        }
 
 

Falls das Element kleiner ist, machen wir es zum neuen Minimum, falls es größer ist, zum neuen Maximum.  Dabei müssen wir noch beachten, dass  min bzw.  max null sein können, weil der Default-Konstruktor sie so initialisiert hat. 
 
 

Ansonsten setzen wir voraus, dass der Stream keine  null -Elemente enthält und deshalb das übergebene Stream-Element  s nicht  null sein kann. Andernfalls würde eine  NullPointerException ausgelöst.  Damit diese Voraussetzung auch erfüllt ist, müssen wir vor dem  collect() die Elemente, die  null sind, aus dem Stream herausfiltern.  Die Stream-Operationen  min() und  max() machen übrigens die gleichen Annahmen; auch sie setzen voraus, dass der Stream keine  null -Elemente enthält und lösen eine  NullPointerException aus, wenn sie auf ein Stream-Element stoßen, das  null ist.
 
 

Als letztes müssen wir nun noch den  combiner implementieren.  Er „kombiniert“ aus zwei  MinMaxStringCollector -Objekten ( this und  other ) ein Objekt ( this ), das das kleinere Minimum und das größere Maximum enthält:
 
 

        public void combine(MinMaxStringCollector other) {

            if (min == null) min = other.min;

            else if (min.compareTo(other.min) > 0) min = other.min;
 
 

            if (max == null) max = other.max;

            else if (max.compareTo(other.max) < 0) max = other.max;

        }
 
 

Jetzt können wir unseren selbstdefinierten Zielcontainer in einem  collect() nutzen:
 
 

MinMaxStringCollector mmsc = myStringStream.filter(s -> s != null)

                                           .collect(MinMaxStringCollector::new,

                                                    MinMaxStringCollector::accumulate,

                                                    MinMaxStringCollector::combine);
 
 

System.out.println("min: " + mmsc.getMin().get() + " - max: " + mmsc.getMax().get());
 
 

Wir haben übrigens – wie angekündigt - vor dem  collect() explizit die Elemente aus dem Stream gefiltert, die  null sind, um etwaige  NullPointerException zu vermeiden.
 
 

Die hier gezeigte Technik, mehrere Durchläufe durch einen einzigen  collect() mit selbstdefiniertem Zielcontainer-Typ zu ersetzen, lässt sich universell verwenden.  Der Vorteil ist, wie bereits gesagt, dass man nur einmal über den Stream laufen muss, was bei einem großen Stream einen deutlichen Performancevorteil ausmachen kann.

Ein komplexeres Beispiel, Teil 1

Es gibt Situationen, in denen man nicht unbedingt auf die Idee kommt, dass ein selbstdefinierter Zielcontainer-Typ die beste - unter Umständen sogar die einzige - Lösung ist.  Meist sind es Situationen, in denen man etwas mit veränderlichem Zustand (Englisch: state   bzw. shared mutable state ) machen will.  So kam mal in einem unserer Seminare die Frage auf, wie man mit Streams die Folgezeile einer Datei mit ihrer Anfangszeile verketten kann. 

 
 

Konkret sah das Problem so aus, dass es sich um eine Tracedatei handelte, bei der jeder Traceeintrag am Anfang einer Zeile beginnt.  Wenn der Traceeintrag zu lang für eine Zeile ist, wird umgebrochen und der Traceeintrag wird in der nächsten Zeile fortgesetzt, wobei die Fortsetzungszeile mit einem Blank ( ' ' ) beginnt.  Dabei ist es durchaus möglich, dass ein Eintrag über mehr als zwei Zeilen geht, wobei jede weitere Folgezeile wieder mit einem Blank beginnt.  Wie kann man mit Hilfe von Streams die Traceeinträge wieder zu einer Zeile zusammenfassen, um sie anschließend weiterzuverarbeiten?
 
 

Ein Hinweis darauf, dass diese Aufgabe mit einem  collect() zu lösen ist, findet man, wenn man sich überlegt, dass wir mit dem Aneinanderketten der Folgezeilen auf einem veränderlichen Zustand operieren.  Was wir nun noch brauchen, ist ein Zielcontainer-Typ, der die umgebrochenen Traceeinträge zu einer Zeile zusammenfasst. 
 
 

Wir werden die Methode  Stream<String> BufferedReader.lines() nutzen, um unsere Tracedatei als  Stream<String> bearbeiten zu können.  Unser Zielcontainer-Typ muss nun diese  String s hernehmen und sie wenn nötig (d.h. falls das erste Zeichen ein Blank ist) zu Traceeinträgen konkatenieren.  Unseren Zielcontainer-Typ wollen wir  TraceEntryCollector nennen.  Er enthält eine  List<StringBuilder> , in der wir die konkatenierten Traceeinträge sammeln:
 
 

public class TraceEntryCollector {
 
 

    private List<StringBuilder> theList = new ArrayList<>();
 
 

       …

}
 
 

Jeder Listeneintrag repräsentiert einen Traceeintrag.  Als Ergebnis unseres Kollektors wollen wir aber ein  String[] zurück liefern.  Dafür implementieren wir eine Methode  getResult() , die später als  finalizer genutzt wird:
 
 

        public String[] getResult() {

            int size = theList.size();

            String[] array = new String[size];

            for (int i=0; i<size; i++) {

                array[i] = theList.get(i).toString();

            }

            return array;

        }
 
 

Kommen wir nun zum  accumulator .  Er sieht so aus:
 
 

        public void accumulate(String s) {

            if (s.length() > 0) {                                           // Zeile 2

                if (s.charAt(0) != ' ') theList.add(new StringBuilder(s));  // Zeile 3

                else {

                    int size = theList.size();
 
 

                    if(size == 0) theList.add(new StringBuilder(s));        // Zeile 7

                    else theList.get(size - 1).append(s);                   // Zeile 8

                }

            }

        }
 
 

Der Grundgedanke ist, dass

eine Zeile, die nicht mit einem Blank beginnt, einen neuen Eintrag in der Liste erzeugt (Zeile 3)  und

eine Zeile, die mit einem Blank beginnt, an die vorhergehende angehängt wird (Zeile 8).
 
 

Zusätzliche sind noch zwei Sonderfälle zu beachten.  Zum einen werden Leerzeilen gleich zu Anfang herausgefiltert (Zeile 2).  Zum anderen kann bei parallelen Streams das Aufsplitten der Streamsource dazu führen, dass die erste Zeile eine Folgezeile ist, also mit einem Blank beginnt.  Dies wird in Zeile 7 abgehandelt.  
 
 

Schauen wir uns als nächstes den  combiner an. Er sieht so aus:
 
 

        public TraceEntryCollector combine(TraceEntryCollector other) {

            if (other.theList.size() != 0) {                 // Zeile 2

               if (theList.size() == 0)                     // Zeile 3

                    return other;
 
 

                if (other.theList.get(0).charAt(0) == ' ') {

                    int idx = theList.size() - 1;            // Zeile 7

                    theList.set(idx, theList.get(idx).append(other.theList.get(0)));

                    for (int i = 1; i < other.theList.size(); i++)

                        theList.add(other.theList.get(i));

                } else {

                    this.theList.addAll(mc.theList);         // Zeile 12

                }

            }
 
 

            return this;

        }
 
 

Die zentrale Funktionalität des  combiners besteht darin, an den  this - TraceEntryCollector die Zeilen des als Parameter übergebenen  other - TraceEntryCollector anzuhängen.  Dies wird in Zeile 12 gemacht.  Etwas aufwändiger ist dies, wenn der  other - TraceEntryCollector mit einer Folgezeile beginnt.  Der Code dafür ist in Zeile 7ff. zu finden.  Zusätzlich gibt es noch Optimierungen für den Fall, dass der  other - bzw.  this - TraceEntryCollector leer sind: Zeile 2 bzw. Zeile 3.
 
 

Nun wollen wir unseren neuen Zielcontainer benutzen:
 
 

try (BufferedReader reader = new BufferedReader(new FileReader("trace.log"))) {

    String[] result = reader.lines()

                           .collect(Collector.of(TraceEntryCollector::new,

                                                 TraceEntryCollector::accumulate,

                                                  TraceEntryCollector::combine,

                                                 TraceEntryCollector::get));
 
 

} catch (IOException | UncheckedIOException e) {

    e.printStackTrace();

}
 
 

Wir benutzen die  Collector.of() Factory Methode, um aus dem Zielcontainer einen  Collector zu erzeugen.  Wie oben schon erwähnt, verwenden wir die  BufferedReader.lines() Methode, um die Datei einzulesen.  Sie ermöglicht es, die Tracedatei zeilenweise als einen  Stream<String> zu bearbeiten.
 
 

An dem Beispiel kann man sehen, dass es mit Hilfe eines eigenen Zielcontainer-Typs möglich ist, eine zustandsbehaftete Funktionalität zu implementieren, die das Parsen und Verarbeiten der Datei erledigt.  Bemerkenswert ist, dass das Ganze sogar parallel funktioniert, d.h. wenn wir nicht nur  reader.lines() sondern  reader .lines().parallel() verwenden.  Eine Performanceverbesserung durch die Parallelisierung können wir in diesem Beispiel allerdings nicht unbedingt erwarten, da das Lesen aus der Datei vermutlich der Engpass bei der Verarbeitung ist.

Ein komplexeres Beispiel, Teil 2

Im Allgemeinen ist es bei der Verarbeitung einer Tracedatei nicht damit getan, die Traceeinträge aus den Zeilen zusammen zusetzten.  Typischerweise fängt erst danach die eigentliche Arbeit mit der Auswertung des Traces erst an.  Auch dies lässt sich in einem Kollektor- bzw. Zielcontainer-Typ erledigen.  Sinnvoll ist es dabei sogar, wenn die gesamte Arbeit (Konkatenieren und Verarbeiten) in einem einzigen Zielcontainer erledigt wird.  Der Vorteil ist, dass nicht erst eine Speicherstruktur (in unserem Fall das  String -Array der Traceeinträge) erzeugt werden muss, die anschließend wieder Input für die Weiterverarbeitung ist, sondern es kann alles in einem Durchlauf erledigt werden.

 
 

Es gibt übrigens auch eine Alternative zur Implementierung der gesamten Funktionalität in einem einzigen Kollektor bzw. Zielcontainer, die wir an dieser Stelle nicht besprechen wollen, weil sie den Rahmen des Artikels sprengen würde.  Aber wir wollen sie der Vollständigkeit halber zumindest erwähnen.  Man kann Kollektoren bauen, die weiteren Downstream-Kollektoren verwenden.  Wir haben uns die Benutzung von Downstream-Kollektoren bereits in einem vorhergehenden Artikel angesehen (/KLKO/).  Sie selbst zu implementieren, ist relativ aufwändig; deshalb werden wir hier keinen Kollektor mit Downstream-Kollektor besprechen.  Bei einer solchen Lösung mit Downstream-Kollektoren würde die Abarbeitung ebenfalls in einem einzigen Durchlauf erfolgen - so als wäre die gesamte Funktionalität aller Kollektoren in einem einzigen Kollektor implementiert.  Ein Vorteil ergibt sich bei der Benutzung, denn sie ist etwas strukturierter und damit übersichtlicher.
 
 

Wir wollen uns aber nun anschauen, wie ein Zielcontainer aussieht, der nicht nur konkateniert, sondern anschließend auch noch weitere Verarbeitungsfunktionalität enthält.  Nehmen wir an, die Verarbeitungsfunktionalität besteht darin:

wie bisher die Zeilen eines Traceeintrags zu einer Zeile zusammenzufassen und

anschließend zusätzlich die Länge des Traceeintrages zu bestimmen. 
 
 

Unser oben gezeigter  TraceEntryCollector kann nur konkatenieren und wir mussten die Nachverarbeitung der Längenbestimmung anschließend selber machen.  Das sieht so aus:
 
 

String[] result = reader.lines()

                        .collect(Collector.of(TraceEntryCollector::new,

                                             TraceEntryCollector::accumulate,

                                              TraceEntryCollector::combine,

                                             TraceEntryCollector::get));
 
 

int[] lens = new int[result.length()]

for (int i=0; i< result.length(); i++)

    lens[i] = result[i].length();
 
 

Unser neuer Zielcontainer-Typ soll  TraceEntryLengthCounter heißen und am Ende ein Array mit den Längen der konkatenierten Traceeinträge liefern.  Dafür brauchen wir eine  List<Integer> , in der wir die Längen der Traceeinträge sammeln: 
 
 

public static class TraceEntryLengthCounter {
 
 

        private List<Integer> theList = new ArrayList<>();

        private boolean successive = false;
 
 

       …

}
 
 

Da wir die Originalzeilen aus der Datei gleich weiterverarbeiten wollen, indem wir ihre Länge bestimmen, verlieren wir die Identifikation, ob es sich um eine Folgezeile (beginnend mit Blank) handelt oder nicht.  Das ist ein Problem bei der ersten Zeile, die an unseren Zielcontainer übergeben wird.  Deshalb speichern wir diesen Aspekt in dem  boolean Feld  successive .
 
 

Der  accumulator sieht dann so aus:
 
 

        public void accumulate(String s) {

            if (s.length() > 0) {

                if (s.charAt(0) != ' ')         

                    theList.add(s.length());         // Zeile 4

                else {

                    int size = theList.size();

                    if (size == 0) {

                        successive = true;           // Zeile 8

                        theList.add(s.length());

                    } else {

                        theList.set(size - 1, theList.get(size - 1) + s.length());

                    }

                }

            }

        }
 
 

Wenn es sich nicht um eine Folgezeile handelt, wird ein neuer Eintrag mit der Länge der Zeile in  theList erzeugt (Zeile 4).  Bei einer Folgezeile gilt es zu unterscheiden, ob es sich um die allererste Zeile handelt, die an den  TraceEntryLengthCounter übergeben wird, oder nicht.  Wenn es sich um die allererste Zeile handelt, wird  successive auf  true gesetzt und ein neuer Eintrag mit der Länge der Zeile in  theList erzeugt (Zeile 8 folgende).  Wenn nicht, wird zu dem letzten Eintrag von  theList die Länge der Zeile dazu addiert.
 
 

Hier ist der dazugehörige  combiner :
 
 

        public TraceEntryLengthCounter combine(TraceEntryLengthCounter other) {

            if (other.theList.size() != 0) {                // Zeile 2

                if (theList.size() == 0) {                  // Zeile 3

                     return other;

                }

                if (other.successive) {

                    int idx = theList.size() - 1;           // Zeile 7

                    theList.set(idx, theList.get(idx) + other.theList.get(0));

                    for (int i = 1; i < other.theList.size(); i++)

                        theList.add(other.theList.get(i));

                } else {

                    theList.addAll(other.theList);          // Zeile 12

                }

            }
 
 

            return this;

        }
 
 

Wenn der als Parameter übergebene  TraceEntryLengthCounter nicht mit einer Folgezeile beginnt, werden die Einträge seiner Liste einfach an die Liste von  this angehängt (Zeile 12).  Falls er mit einer Folgezeile beginnt, ist das Ganze etwas aufwändiger (Zeile 12 folgende): der erste Eintrag der Liste von  other wird zum letzten Eintrag der Liste von  this dazu addiert.  Die weiteren Listeneinträge von  other werden an die Liste von  this angehängt.
 
 

Aus Optimierungsgründen wird noch geprüft, ob  other (Zeile 2) oder  this (Zeile 3) leer ist.  Wenn ja, wird das jeweils andere  TraceEntryLenghtCounter -Objekt zurückgegeben.
 
 

Es gibt auch noch einen  finalizer . Er wandelt die  List<Integer> in ein  int[] und gibt es zurück:
 
 

        public int[] get() {

            int size = theList.size();

            int[] array = new int[size];

            for (int i=0; i<size; i++) {

                array[i] = theList.get(i);

            }

            return array;

        }
 
 

Benutzt wird der  TraceEntryLengthCounter wie der  TraceEntryCollector , indem aus dem Zielcontainer mit Hilfe von  Collector.of() ein  Collector konstruiert wird.
 
 

Das Beispiel des  TraceEntryLengthCounter zeigt, dass auch die Weiterverarbeitung innerhalb eines Zielcontainers erfolgen kann.  Wie oben schon gesagt, ist der Vorteil dieses Ansatzes, dass die gesamte Verarbeitung in einem Lauf über den Input-Stream erfolgt und keine unnötige Datenstruktur im Speicher aufgebaut werden muss. 

Zusammenfassung

In diesem Artikel haben wir uns angesehen, wie mächtig das  collect() mit einem selbstdefinierten Zielcontainer-Typ werden kann.  In den meisten Fällen, in denen man zustandsabhängige Funktionalität zusammen mit Streams verwenden will, ist  collect() mit einem selbstdefinierten Zielcontainer-Typ der beste - oft sogar der einzige - sinnvolle Ansatz.  Zugegeben, die Implementierung von  accumulator und  combiner kann bei anspruchsvollen Problemen recht komplex werden.  Die Komplexität rührt daher, dass die Verarbeitung bei Verwendung eines parallelen Streams dann aber auch parallel funktioniert.  Etwas Vergleichbares (parallele Verarbeitung aller Elemente einer Sequenz) wäre mit einer Schleife und imperativem Programmierstil gar nicht zu erreichen.  So gesehen ist der Aufwand für die Implementierung eines selbstdefinierten Zielcontainer-Typs durchaus angemessen.

 

Literaturverweise

/KLLM/   Lambda-Ausdrücke und Methoden-Referenzen
Klaus Kreft & Angelika Langer, Java Magazin, Dezember 2013
URL: http://www.angelikalanger.com/Articles/EffectiveJava/71.Java8.Lambdas/71.Java8.Lambdas.html
/KLPS/       Parallel Streams
Klaus Kreft & Angelika Langer, Java Magazin, Juli 2015
URL: http://www.angelikalanger.com/Articles/EffectiveJava/81.Java8.Parallel-Streams/81.Java8.Parallel-Streams.html
/KLKO/   Stream-Kollektoren und die Stream-Operation collect()
Klaus Kreft & Angelika Langer, Java Magazin, September 2014
URL: http://www.angelikalanger.com/Articles/EffectiveJava/76.Java8.Stream-Collectors/76.Java8.Stream-Collectors.html
/KLOP/ Optional<T>
Klaus Kreft & Angelika Langer, Java Magazin, Mai 2015
URL: http://www.angelikalanger.com/Articles/EffectiveJava/80.Java8.Optional-Result/80.Java8.Optional-Result.html
/KLRC/ reduce() vs. collect()
Klaus Kreft & Angelika Langer, Java Magazin, November 2015
URL: http://www.angelikalanger.com/Articles/EffectiveJava/83.Java8.Reduce-vs-Collect-Stream-Operations/83.Java8.Reduce-vs-Collect-Stream-Operations.html

Die gesamte  Serie über Java 8:

/JAV8-0/ Neue Features in Java 8 - Überblick
Klaus Kreft & Angelika Langer, Java Magazin, März 2014
URL: http://www.angelikalanger.com/Articles/EffectiveJava/73.Java8.Overview/73.Java8.Overview.html
/JAV8-1/ Funktionale Programmierung in Java
Klaus Kreft & Angelika Langer, Java Magazin, September 2013
URL: http://www.angelikalanger.com/Articles/EffectiveJava/70.Java8.FunctionalProg/70.Java8.FunctionalProg.html
/JAV8-2/ Lambda-Ausdrücke und Methoden-Referenzen
Klaus Kreft & Angelika Langer, Java Magazin, Dezember 2013
URL: http://www.angelikalanger.com/Articles/EffectiveJava/71.Java8.Lambdas/71.Java8.Lambdas.html
/JAV8-3/ Default-Methoden und statische Methoden in Interfaces
Klaus Kreft & Angelika Langer, Java Magazin, Februar 2014
URL: http://www.angelikalanger.com/Articles/EffectiveJava/72.Java8.DefaultMethods/72.Java8.DefaultMethods.html
/JAV8-4/ Übersicht über das Stream API in Java 8
Klaus Kreft & Angelika Langer, Java Magazin, Mai 2014
URL: http://www.angelikalanger.com/Articles/EffectiveJava/74.Java8.Streams-Overview/74.Java8.Streams-Overview.html
/JAV8-5/ Stream-Erzeugung und Stream-Operationen
Klaus Kreft & Angelika Langer, Java Magazin, Juli 2014
URL: http://www.angelikalanger.com/Articles/EffectiveJava/75.Java8.Fundamental-Stream-Operations/75.Java8.Fundamental-Stream-Operations.html
/JAV8-6/ Stream-Kollektoren und die Stream-Operation collect()
Klaus Kreft & Angelika Langer, Java Magazin, September 2014
URL: http://www.angelikalanger.com/Articles/EffectiveJava/76.Java8.Stream-Collectors/76.Java8.Stream-Collectors.html
/JAV8-7/ Stateful Lambdas - Regeln für die Seiteneffekte in Lambda-Ausdrücken, die an Stream-Operationen übergeben werden
Klaus Kreft & Angelika Langer, Java Magazin, November 2014
URL: http://www.angelikalanger.com/Articles/EffectiveJava/77.Java8.Streams-and-Statefulness/77.Java8.Streams-and-Statefulness.html
/JAV8-8/ Das Date/Time API
Klaus Kreft & Angelika Langer, Java Magazin, Januar 2015
URL: http://www.angelikalanger.com/Articles/EffectiveJava/78.Java8.Date-Time-API/78.Java8.Date-Time-API.html
/JAV8-9/ CompletableFuture
Klaus Kreft & Angelika Langer, Java Magazin, März 2015
URL: http://www.angelikalanger.com/Articles/EffectiveJava/79.Java8.CompletableFuture/79.Java8.CompletableFuture.html
/JAV8-10/ Optional<T>
Klaus Kreft & Angelika Langer, Java Magazin, Mai 2015
URL: http://www.angelikalanger.com/Articles/EffectiveJava/80.Java8.Optional-Result/80.Java8.Optional-Result.html
/JAV8-11/ Parallel Streams
Klaus Kreft & Angelika Langer, Java Magazin, Juli 2015
URL: http://www.angelikalanger.com/Articles/EffectiveJava/81.Java8.Parallel-Streams/81.Java8.Parallel-Streams.html
/JAV8-12/ Das Performance-Modell der Streams
Klaus Kreft & Angelika Langer, Java Magazin, September 2015
URL: http://www.angelikalanger.com/Articles/EffectiveJava/82.Java8.Performance-Model-of-Streams/82.Java8.Performance-Model-of-Streams.html
/JAV8-13/ reduce() vs. collect()
Klaus Kreft & Angelika Langer, Java Magazin, November 2015
URL: http://www.angelikalanger.com/Articles/EffectiveJava/83.Java8.Reduce-vs-Collect-Stream-Operations/83.Java8.Reduce-vs-Collect-Stream-Operations.html
/JAV8-14/ User-Defined Collectors
Klaus Kreft & Angelika Langer, Java Magazin, Januar 2016
URL: http://www.angelikalanger.com/Articles/EffectiveJava/84.Java8.User-Defined-Stream-Collectors/84.Java8.User-Defined-Stream-Collectors.html
/JAV8-15/ Parallele Streams und Blockierende Funktionalität
Klaus Kreft & Angelika Langer, Java Magazin, März 2016
URL: http://www.angelikalanger.com/Articles/EffectiveJava/85.Java8.Streams-and-Blocking-Functionality/85.Java8.Streams-and-Blocking-Functionality.html
/JAV8-16/ API-Design mit Lambdas
Klaus Kreft & Angelika Langer, Java Magazin, Mai 2016
URL: http://www.angelikalanger.com/Articles/EffectiveJava/86.Java8.API-Design-With-Lambdas/86.Java8.API-Design-With-Lambdas.html
/JAV8-17/ Low-Level-Aspekte beim API Design mit Lambdas
Klaus Kreft & Angelika Langer, Java Magazin, Juli 2016
URL: http://www.angelikalanger.com/Articles/EffectiveJava/87.Java8.Programming-With-Lambdas/87.Java8.Programming-With-Lambdas.html
/JAV8-18/ Benutzer-definierte Stream-Sourcen und Spliteratoren
Klaus Kreft & Angelika Langer, Java Magazin, September 2016
URL: http://www.angelikalanger.com/Articles/EffectiveJava/88.Java8.User-Defined-Stream-Sources-And-Spliterators/88.Java8.User-Defined-Stream-Sources-And-Spliterators.html

 
 

If you are interested to hear more about this and related topics you might want to check out the following seminar:
Seminar
Lambdas & Streams - Java 8 Language Features and Stream API & Internals
3 day seminar ( open enrollment and on-site)
Java 8 - Lambdas & Stream, New Concurrency Utilities, Date/Time API
4 day seminar ( open enrollment and on-site)
Effective Java - Advanced Java Programming Idioms 
4 day seminar ( open enrollment and on-site)
 
Related Reading
Lambda & Streams Tutorial & Reference
In-Depth Coverage of all aspects of lambdas & streams
Lambdas in Java 8
Conference Presentation at JFokus 2012 (slides)
Lambdas in Java 8
Conference Presentation at JavaZone 2012 (video)
 

 
 
  © Copyright 1995-2018 by Angelika Langer.  All Rights Reserved.    URL: < http://www.AngelikaLanger.com/Articles/EffectiveJava/84.Java8.User-Defined-Stream-Collectors/84.Java8.User-Defined-Stream-Collectors.html  last update: 26 Oct 2018