Parallele Datensammlung mit Java 8

Fazit

Parallele Stream-Verarbeitung

Wird also je Thread eine eigene SummingUnit erstellt, die einen Teil der Werte aufsummiert, und werden nach der parallelen Verarbeitung die Teilergebnisse zusammengeführt, sind spezielle Mechanismen wie Synchronisation nicht erforderlich.

Zurück zur collect()-Methode der parallelen Stream-Verarbeitung. Die erwähnte Variante ist wie folgt definiert:

<R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);

Der erste Parameter, supplier, steht für eine Funktion, die einen Ergebnisbehälter vom Typ R (Result) erzeugt. Im Beispiel hat die Funktion die Aufgabe, für jeden Thread eine neue SummingUnit zu erzeugen.

Die zweite Funktion ist ein Akkumulator. Das heißt, sie nimmt Werte entgegen und hält das Ergebnis der Berechnung temporär fest, um dann mit diesem (Zwischen-)Ergebnis und dem nächsten Wert die folgende Berechnung anzustellen. Das entspricht der sum()-Methode. Sie arbeitet auf einem Objekt vom Typ R (der SummingUnit) und akzeptiert einen Wert vom Typ T, was dem Typ eines Objekts des
Streams entspricht.

Zu guter Letzt die combine-Funktion, die Leser aus dem Thread-Beispiel kennen. Hier handelt es sich um ein Objekt vom Typ R (wieder SummingUnit), das ein anderes Objekt des Typs akzeptiert. Da die Collect()-Methode im Beispiel eine SummingUnit zurückgibt, ist anschließend getSum() aufzurufen, um die Summe zu erhalten.

System.out.println("total: " +
numbers.parallelStream().collect(
() -> new SummingUnit(), // supplier
(summingUnit, value) ->
summingUnit.sum(value), // accumulator
(summingUnit, other) ->
summingUnit.combine(other) // combiner
).getSum()
);

Die Namensgebung bei den Lambda-Ausdrücken ist frei wählbar. Aufgrund ihres kleinen Geltungsbereichs werden die Variablen daher häufig mit kurzen Namen versehen.

.collect(() -> new SummingUnit(), 
(s, v) -> s.sum(v), (s, o) -> s.combine(o))

An der Stelle lässt sich alternativ mit Methodenreferenzen arbeiten

.collect(SummingUnit::new, SummingUnit::sum, SummingUnit::combine)

Es ist demnach recht einfach möglich, Daten nach eigenem Gusto zu sammeln. In der Praxis werden Entwickler nicht mit so einer einfachen Anwendung daherkommen, doch lässt sich das Prinzip auf ein passendes Sammelobjekt der fachlichen Domäne übertragen.

Es ist auch möglich, die Sammelfunktionen in ein eigenes Objekt auszulagern und dort auch gleich den letzten Schritt des Ergebnisabrufs (im Beispiel getSum) vorzunehmen. Das ist genau das, was Entwickler mit der überladenen collect-Methode erreichen.

<R, A> R collect(Collector<? super T, A, R> collector);

Wie unschwer zu erkennen, erwartet collect hier ein Objekt vom Typ Collector. Dabei handelt es sich nicht um eine abstrakte Klasse, sondern um eine Schnittstelle. Im Folgenden wird die Klasse SummingCollector vorgestellt, die diese Schnittstelle implementiert. Die Anwendung gestaltet sich dann recht kurz:

System.out.println("total: " + 
numbers.parallelStream().collect(new SummingCollector()));

Es sind Methoden zu definieren, die einen Ergebnis-Container sowie eine Akkumulator- und eine Kombinierfunktion liefern. Dies ist von der ersten collect-Variante her bekannt. Zusätzlich ist eine Methode erforderlich, die eine Funktion für das finale Ergebnis liefert. Das ist an dieser Stelle zu beachten: Die Methoden liefern nicht direkte Ergebnisse, sondern Funktionen, welche die Ergebnisse produzieren. Eine Funktion lässt sich am einfachsten mittels Lambda-Ausdruck realisieren.

Eine letzte Methode liefert schließlich noch diverse Charakteristiken, die dem Compiler Hinweise über die Nutzung des Collectors geben. Der Einfachheit halber nutzt der vorgestellte SummingCollector die existierende SummingUnit, an die er einzelne Aufgaben deligiert.

public class SummingCollector 
implements Collector<Long, SummingUnit, Long>{
@Override
public Supplier<SummingUnit> supplier() {
return () -> new SummingUnit();
}

@Override
public BiConsumer<SummingUnit, Long> accumulator() {
return (s, v) -> s.sum(v);
}

@Override
public BinaryOperator<SummingUnit> combiner() {
return (left, right) -> {left.combine(right); return left;};
}

@Override
public Function<SummingUnit, Long> finisher() {
return s -> s.getSum();
}

@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.UNORDERED);
// do not add ", Characteristics.CONCURRENT"!
}

Die Klasse implementiert das typisierte Interface Collector. Die drei Typen (T, A, R; siehe Methodensignatur oben) besagen: Verarbeite ein Objekt vom Typ Long (eine Zahl), nutze einen Erbebniscontainer vom Typ SummingUnit und produziere ein finales Ergebnis vom Typ long (die Summe).

supplier liefert eine neue SummingUnit. Das funktioniert aber nur, da in den Charakteristiken der Wert
Characteristics.CONCURRENT nicht enthalten ist. Wird er angegeben, betrachtet der Compiler den Collector per se als Multithreading-fähig und erzeugt nur eine SummingUnit. Wie erläutert, ist die Methode sum() jedoch nicht für Multithreading ausgelegt. So wie es implementiert ist, erhalten Entwickler die gewünschte SummingUnit je Thread. Die Funktion accumulator nimmt eine
SummingUnit sowie einen Wert entgegen. Hier wird dann einfach addiert.

combiner arbeitet auf zwei SummingUnits, hier als links und rechts bezeichnet. Anders als bei der ersten Variante wird nicht zu einem "führenden" Ergebniscontainer der zweite kombiniert. Vielmehr sind beide gleichberechtigt. Nach der combine-Operation ist ein Ergebniscontainer zurückzugeben. Im Beispiel ist das der linke. Je nach Aufgabenstellung können Entwickler aber auch einen völlig neuen Container erzeugen, der die kombinierten Werte enthält.

Hinzugekommen ist die Funktion finisher, die einen (den verbleibenden) Ergebniscontainer entgegennimmt und das finale Ergebnis liefert. Zu guter Letzt sehen Leser die Angabe der Charakteristik unordered, die dem Compiler mitteilt, dass die Reihenfolge bei der Verarbeitung der Stream-Objekte keine Rolle spielt. Das ermöglicht eine effektive Parallelisierung.

Fazit

Lambdas und Streams erlauben die einfache Parallelisierung ohne explizite Nutzung von Threads, Synchronisierung, Locks und sonstigen Dingen, die mindestens keinen Spaß machen und manche Entwickler gar davon abhalten, "multithreaded" zu programmieren. Wie das einfache Beispiel gezeigt hat, ist die Kenntniss dessen, was hinter den Kulissen abläuft, sehr hilfreich, um parallele Verabreitung mit Streams sinnvoll umzusetzen. Mit dem Wissen lassen sich deutlich komplexere Datensammler erstellen. (ane)

Michael Müller
ist als Bereichsleiter Softwareentwicklung der InEK GmbH verantwortlich für Projekte im Web-, Java- und .NET-Umfeld. Daneben betätigt er sich als freier Autor und verfasst Fachartikel zu diversen Entwicklungsthemen sowie Buchrezensionen.