zurück zum Artikel

Parallele Datensammlung mit Java 8

Sprachen
Parallele Datensammlumg mit Java 8

Zu den großen Neuerungen von Java 8 gehörten Lambda-Ausdrücke und das Stream-Interface. Das aktuelle Java ist zwar schon eine Weile verfügbar, doch noch immer sind viele Unternehmen dabei, darauf umzustellen. Gut, wer dann mit parallelen Collections umgehen kann.

Einführende Artikel zu Lambdas [1] und Streams [2] hat es auf heise Developer schon gegeben. Insofern werden in diesem Artikel die Konzepte nur kurz aufgefrischt, um über eine kurze Betrachtung der Java-Nebenläufigkeit den Fokus auf spezielle Aspekte wie parallele Collections zu legen.

Basics zur Parallelität in Java 8

Ein Stream ist als kontinuierlicher Fluss von Daten vorstellbar, ähnlich einem InputStream. Die Daten werden von einer Quelle, zum Beispiel einer Collection, in Umlauf gebracht. Ein Datum ist jedoch nicht einfach ein Byte, sondern kann ein beliebiges Objekt darstellen. Im Falle einer Collection als Datenquelle entspricht ein Datum des Streams dem der Collection.

Prinzipieller Aufbau eines Streams (Abb. 1)
Prinzipieller Aufbau eines Streams (Abb. 1)

Ein ParallelStream ist als paralleler Strom von Objekten vorstellbar. Für die Aufteilung und Anzahl der (parallelen) Ströme ist der sogenannte Spliterator (Splitting Iterator) zuständig, der entscheidet, ob die Daten in kleinere, parallel zu verarbeitende Einheiten aufgeteilt werden, und über die Daten einer solchen Einheit iteriert. Unter der Haube wird dabei das Fork-Join-Framework von Java genutzt.

Prinzipieller Aufbau eines parallelen Streams (Abb. 2)
Prinzipieller Aufbau eines parallelen Streams (Abb. 2)

Am Ende des Streams steht eine terminale Operation. Sie führt die parallelen Ströme zu einem Ergebnis zusammen. In den Stream lassen sich neben der terminalen beliebige intermediäre Operationen einfügen, die in einer Kette durchgeführt werden. Insofern gibt es aus Entwicklersicht nur Eingangs- und Ausgangsdaten, aber keine expliziten Zwischenergebnisse.

ParallelStream mit intermediärer und terminaler Operation (Abb. 3)
ParallelStream mit intermediärer und terminaler Operation (Abb. 3)

Abbildung 3 zeigt beispielhaft einen parallelen Stream mit Filter- und Map-Operation sowie einem List Collector als terminaler Operation. Ein solcher Stream verhält sich "lazy": Er lässt sich mit seinen intermediären Operationen als Prozesskette aufbauen, ohne dass tatsächlich Operationen ausgeführt werden. Erst mit Aufruf der terminalen Operation wird der Stream gestartet. Für Collections ergibt sich damit der folgende prinzipielle Aufbau:

AnyCollection
.stream()
[.intermediateOperation1()
[.intermediateOperation2()
[....]]]
.terminalOperation();

Wie die eckigen Klammern andeuten, können Entwickler keine bis beliebig viele intermediäre Operationen angeben. Zur Ausführung reicht jedoch eine terminale Operation.

Streams sind nicht nur auf Collections definiert, sondern auch in anderen Interfaces oder Klassen. Sie erweitern als Default-Methoden existierende Interfaces und können mit unterschiedlichen Namen daherkommen, zum Beispiel als .intStream() oder als .lines().

Im folgenden Beispiel erzeugt createNumbers() einfach eine Liste von Zahlen. Aus ihr sollen nun die Vorkommen aller Werte kleiner 10 gezählt werden. Hierzu nutzt man als intermediäre Operation einen Filter, der nur Zahlen kleiner 10 passieren lässt. Die Methode filter() arbeitet auf dem Interface Predicate, das eine abstrakte Methode test(Object o) definiert. Hier wird jeweils ein Element aus dem Stream übergeben.

public class SimpleStream {
public static void main(String[] args) {
List<Integer> numbers = createNumbers();
System.out.println("Count < 10: " +
numbers.stream().filter(new LessThan10()).count());
}

private static class LessThan10 implements Predicate{
@Override
public boolean test(Object n) {
return (int)n < 10;
}
}
}

Doch erscheint das Ganze recht umständlich. Zwar lässt sich mit dem Stream die Schleife verstecken und somit die Bearbeitung linearisieren, doch stört die aufwendige Definition der Prüfklasse. An der Stelle kommen nun die Lambda-Ausdrücke ins Spiel. Sie lassen sich an Stellen funktionaler Interfaces nutzen. Als solche wird eine Schnittstelle mit genau einer abstrakten Methode bezeichnet.

public class SimpleStream {
public static void main(String[] args) {
List<Integer> numbers = createNumbers();
System.out.println("Count < 10: " +
numbers.stream().filter(n -> n < 10).count());
}
}

n ist hierbei ein frei gewählter Bezeichner, der für ein im Stream vorkommendes Objekt steht. Dieses n wird nun als Parameter in die Funktion hineingegeben. Die zu überschreibende Methode test(Object n) gibt einen booleschen Wert zurück, und zwar mittels n < 10. Da der Compiler an der Stelle den Typ des Objekts anhand der Collection ermitteln kann, sind weder eine Typangabe für den Parameter n noch ein explizites Casting erforderlich.

Der Lambda-Operator -> mag ein wenig an Mathematik erinnern. Hier heißt es x ->f(x): Ein Parameter x wird auf eine Funktion f(x) abgebildet. Wer JavaScript kennt, dem ist obiger Ausdruck vielleicht als function(n) n < 10 bekannt.

Für Parameter in Lambda-Ausdrücken gelten unter anderem folgende Regeln:

Soweit eine kurze Auffrischung der Grundlagen zu Lambdas und Streams. Neben den genannten Artikeln sind die Grundlagen in einem E-Book des Autors [3] ausführlich beschrieben.

Parallele Datensammler

In diesem Artikel geht es um die parallele Datensammlung. Um das Beispiel einfach zu halten, werden Zahlen nur summiert. Die gezeigten Fallstricke und Lösungen lassen sich dann auf andere Aufgabenstellungen übertragen. Als terminale Operation kommt die Methode collect zum Einsatz:

long sum = numbers.parallelStream().collect(...);

Sie dient dem Sammeln von Daten in einem Behälter. Während er bestehen bleibt, ändert sich sein Inhalt. Man hat es mit einer veränderlichen Datenstruktur zu tun. Im Gegensatz dazu arbeitet das im Artikel nicht behandelte reduce mit unveränderlichen Datenstrukturen.

collect ist eine überladene Methode, die mit zwei unterschiedlichen Signaturen definiert ist. Sie erwartet drei Parameter: supplier, accumulator und combiner. Was es mit ihnen auf sich hat, erfahren Leser nach einem kurzen Exkurs in die Grundlagen der Java-Concurrency.

Exkurs Concurrency

Ausflugsziel Nebenläufigkeit

Concurrency wird im Deutschen als Nebenläufigkeit bezeichnet. Das drückt aus, dass mehrere Programmstränge (Threads) nebeneinander – aber nicht unbedingt gleichzeitig – ausgeführt werden. Klassisches Beispiel ist die Verlagerung rechenintensiver Aufgaben in den Hintergrund, damit das User Interface nicht blockiert.

Von Parallelität spricht man, wenn die nebenläufigen Programmstränge gleichzeitig die gleichen Algorithmen ausführen. Dabei gehen die Meinungen auseinander, ob nun Parallelität einen Spezialfall der Nebenläufigkeit oder eine andere Form der Programmausführung darstellt. Das spielt für die Betrachtung hier jedoch keine Rolle. Im Rahmen der parallelen Streams werden gleiche Algorithmen gleichzeitig auf unterschiedliche Daten des Streams angewendet. In Abbildung 3 ist das als vertikaler Schnitt durch die Datenströme erkennbar, bei dem immer dieselbe Operation in einer Linie angeordnet ist.

Java wurde von Anfang an auf Multithreading ausgelegt. So existiert die Klasse Thread bereits seit dem JDK 1.0.

Unterscheidung Prozess – Thread – Programm

Siehe auch die Java-SE-Dokumentation [4].

public static void main(String[] args) {
Thread thread = new MyThread();
thread.start();
System.out.println("Message from Main");
}

private static class MyThread extends Thread {
@Override
public void run() {
System.out.println("Message from MyThread");
}
}

Ein Thread lässt sich einfach als Klasse implementieren, die von Thread erbt. Sie muss lediglich die Methode run() überschreiben. Im aufrufenden Programm wird eine neue Instanz der Klasse angelegt und mittels start() ausgeführt. start() sorgt hinter den Kulissen dafür, dass ein neuer Thread erzeugt wird, und ruft dann in ihm die Methode run() auf.

Die Nutzung von Thread erscheint auf den ersten Blick intuitiv. Dass die eigene Klasse von Thread erben muss, erweist sich jedoch nicht als hilfreich, schließlich möchte man in der Regel bei Klassenhierarchien fachliche Abhängigkeiten abbilden. Insofern ist Thread als Oberklasse nicht unbedingt nutzbar; es geht auch anders: Statt von Thread zu erben, implementiert die Klasse einfach das Interface Runnable. Die Klasse Thread verfährt intern übrigens nicht anders.

public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.start();
System.out.println("Message from Main");
}

private static class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("Message from MyRunnable");
}
}

Zum Erzeugen eines Threads wird die eine Instanz der eigenen Klasse in eine neue von Thread verpackt. Ansonsten ändert sich nichts.

Bevor die Leser zur Lösung mit parallelen Streams zurückkommen, sei die Addition von Zahlen mittels Threading betrachtet. Für die Summierung wird die Klasse SummingUnit genutzt.

private class SummingUnit {

public SummingUnit() {
System.out.println("ctor SummingUnit");
}

private long sum = 0;

public void sum(long val) {
sum += val;
}

public long getSum() {
return sum;
}

public void combine(SummingUnit other) {
sum += other.sum;
}
}

Die Klasse enthält intern eine Summenvariable, welche die an sum() übergebenen Werte aufsummiert, sowie einen Getter, um die Summe auszulesen. Die Methode combine() wird zunächst nicht benötigt. Aufmerksame Leser mögen hier jedoch einen Bezug zum dritten Parameter von collect() erkennen.

public static void main(String[] args) throws InterruptedException {
List<Long> numbers = Utils.createNumbers();
int size = numbers.size();
SummingUnit summingUnit = new SummingUnit();

Thread thread = new Thread(new SumTask(numbers, 0,
size / 2, summingUnit));
thread.start();

SumTask sumTask = new SumTask(numbers, size / 2,
size, summingUnit);
sumTask.run();
thread.join(); // wait for thread to complete

System.out.println("Sum: " + summingUnit.getSum());
}


private static class SumTask implements Runnable {

private final List<Integer> _numbers;
private final int _start;
private final int _end;
private final SummingUnit _summingUnit;

public SumTask(List<Integer> numbers,
int start,
int end,
SummingUnit summingUnit) {
_numbers = numbers;
_start = start;
_end = end;
_summingUnit = summingUnit;
}

@Override
public void run() {
for (int i = _start; i < _end; i++) {
_summingUnit.sum(_numbers.get(i));
}
}
}

Race Conditions

Hier verarbeitet SumTask die Liste mit den Zahlen partiell. In der Hauptroutine wird SumTask dazu einmal mittels neuem Thread (therad.start), das zweite Mal direkt sumTask.run) aufgerufen. Vor der Ausgabe der Summe wartet das Programm mittels thread.join() auf die Beendigung des Threads, um sicherzustellen, dass die Berechnung auch abgeschlossen ist. Trotz der Maßnahme gibt das Programm bei mehrfacher Ausführung unterschiedliche Ergebnisse aus. Wie kann das sein?

Race Condition

Betrachtet man die Summierung:

sum += val;

so ist das, was nach einer einfachen Operation aussieht, in Wirklichkeit eine ganze Befehlsfolge. Es handelt sich um syntaktischen Zucker, denn die Anweisung lässt sich auch anders schreiben.

sum = sum + val;

Ein gewöhnlicher Computer kann nicht einfach zu einer Variable, die irgendwo im Speicher liegt, einen Wert addieren. Vielmehr erfolgt die Addition in einem Prozessorregister. Damit ergibt sich eine Anweisungsfolge, ähnlich folgendem Pseudocode:

register.load(sum)
register.add(val)
register.write(sum)

Der Wert der Variablen sum wird in ein Register geladen, der Wert von val addiert und das Ergebnis in der Variablen sum abgelegt. Diese drei prinzipiellen Operationen laufen nicht als atomare Einheit ab. Vielmehr lassen sich die Operationen der unterschiedlichen Threads ineinander schachteln, wobei jeder Thread ein eigenes Register nutzen kann. Betrachtet sei ein möglicher Ablauf zweier Threads. Dabei wird ein initialer Wert von sum=0 angenommen. Thread 1 verarbeitet val=5 und Thread 2 val=3. Jeder Thread hat sein eigenes Register.

Operation Ergebnis
Thread 1 register1.load(sum) register1 = 0
Thread 1 register1.add(5) register1 = 5
Thread 1 register1.write(sum) sum = 5
Thread 2 register2.load(sum) register2 = 5
Thread 2 register2.add(3) register2 = 8
Thread 2 register2.write(sum) sum = 8

Da beide Threads unabhängig von- und parallel zueinander werkeln, kann die Reihenfolge der Operationen auch anders erfolgen, beispielsweise so:

Operation Ergebnis
Thread 1 register1.load(sum) register1 = 0
Thread 1 register1.add(5) register1 = 5
Thread 2 register2.load(sum) register2 = 0 (sum ist noch 0!)
Thread 1 register1.write(sum) sum = 5
Thread 2 register2.add(3) register2 = 3
Thread 2 register2.write(sum) sum = 3

In diesem Beispiel wird register2 geladen, bevor der erste Thread sein Ergebnis zurückgeschrieben hat. Somit ist sum noch 0. Wie leicht zu sehen ist, enthält sum nach Ablauf beider Additionen den falschen Wert. Bei dem Problem handelt es sich um eine sogenannte Race Condition. Das Ergebnis ist davon abhängig, wer im Wettlauf um die Operation auf die gemeinsame Variable vorne liegt. Die Methode sum() wird so zu einer kritischen Sektion. Eine solche ist vor konkurrierendem Zugriff zu schützen. Java bietet hierfür verschiedene Mechanismen. Beispielsweise können Entwickler vor Eintritt in die kritische Sektion eine Sperre (lock) setzen oder sie überlassen das Java, indem sie die Methode mit dem Qualifier synchronized versehen.

public synchronized void sum(long val) 

Damit sperren sie die Methode automatisch für andere Threads, solange ein ebensolcher diese durchläuft. Praktisch werden damit die Aufrufe serialisiert, einer nach dem anderen, auf Kosten der Parallelität. Das kann nicht die gewünschte Lösung sein.

Java bietet das Schlüsselwort volatile, mit dem sich die Deklaration der Variablen sum versehen lässt. Dadurch sichert Java lediglich zu, dass nach dem Schreiben in diese Variable genau deren neuer Wert gelesen wird. Das hilft aber im beschriebenen Szenario nicht weiter. (Allerdings könnte ohne volatile auch der erste Ablauf fehlschlagen: Obwohl die 5 nach sum geschrieben wurde, könnte die folgende Operation aufgrund alten Cache-Inhalts noch eine 0 lesen.)

Kritische Sektionen vermeiden

Die gewünschte Lösung muss vielmehr darin liegen, kritische Sektionen wo immer möglich zu vermeiden. Im Beispiel lässt sich das dadurch erreichen, dass jeder Thread mit einer eigenen SummingUnit arbeitet. Damit gibt es keine gemeinsam genutzte Variable mehr. Stattdessen sind die Teilergebnisse der einzelnen SummingUnits am Ende zusammenzutragen – hier greift die combine()-Methode.

public static void main(String[] args) throws InterruptedException {
List<Long> numbers = Utils.createNumbers();
int size = numbers.size();
SummingUnit summingUnit = new SummingUnit();
SummingUnit summingUnit2 = new SummingUnit();

Thread thread = new Thread(new SumTask(numbers, 0,
size / 2, summingUnit));
thread.start();

SumTask sumTask = new SumTask(numbers, size / 2,
size, summingUnit2);
sumTask.run();
thread.join(); // wait for thread to complete

summingUnit.combine(summingUnit2);
System.out.println("Sum: " + summingUnit.getSum());
}

Fazit

Parallele Stream-Verarbeitung

Wird also je Thread eine eigene SummingUnit erstellt, die einen Teil der Werte aufsummiert, und werden nach der parallelen Verarbeitung die Teilergebnisse zusammengeführt, sind spezielle Mechanismen wie Synchronisation nicht erforderlich.

Zurück zur collect()-Methode der parallelen Stream-Verarbeitung. Die erwähnte Variante ist wie folgt definiert:

<R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);

Der erste Parameter, supplier, steht für eine Funktion, die einen Ergebnisbehälter vom Typ R (Result) erzeugt. Im Beispiel hat die Funktion die Aufgabe, für jeden Thread eine neue SummingUnit zu erzeugen.

Die zweite Funktion ist ein Akkumulator. Das heißt, sie nimmt Werte entgegen und hält das Ergebnis der Berechnung temporär fest, um dann mit diesem (Zwischen-)Ergebnis und dem nächsten Wert die folgende Berechnung anzustellen. Das entspricht der sum()-Methode. Sie arbeitet auf einem Objekt vom Typ R (der SummingUnit) und akzeptiert einen Wert vom Typ T, was dem Typ eines Objekts des
Streams entspricht.

Zu guter Letzt die combine-Funktion, die Leser aus dem Thread-Beispiel kennen. Hier handelt es sich um ein Objekt vom Typ R (wieder SummingUnit), das ein anderes Objekt des Typs akzeptiert. Da die Collect()-Methode im Beispiel eine SummingUnit zurückgibt, ist anschließend getSum() aufzurufen, um die Summe zu erhalten.

System.out.println("total: " +
numbers.parallelStream().collect(
() -> new SummingUnit(), // supplier
(summingUnit, value) ->
summingUnit.sum(value), // accumulator
(summingUnit, other) ->
summingUnit.combine(other) // combiner
).getSum()
);

Die Namensgebung bei den Lambda-Ausdrücken ist frei wählbar. Aufgrund ihres kleinen Geltungsbereichs werden die Variablen daher häufig mit kurzen Namen versehen.

.collect(() -> new SummingUnit(), 
(s, v) -> s.sum(v), (s, o) -> s.combine(o))

An der Stelle lässt sich alternativ mit Methodenreferenzen arbeiten

.collect(SummingUnit::new, SummingUnit::sum, SummingUnit::combine)

Es ist demnach recht einfach möglich, Daten nach eigenem Gusto zu sammeln. In der Praxis werden Entwickler nicht mit so einer einfachen Anwendung daherkommen, doch lässt sich das Prinzip auf ein passendes Sammelobjekt der fachlichen Domäne übertragen.

Es ist auch möglich, die Sammelfunktionen in ein eigenes Objekt auszulagern und dort auch gleich den letzten Schritt des Ergebnisabrufs (im Beispiel getSum) vorzunehmen. Das ist genau das, was Entwickler mit der überladenen collect-Methode erreichen.

<R, A> R collect(Collector<? super T, A, R> collector);

Wie unschwer zu erkennen, erwartet collect hier ein Objekt vom Typ Collector. Dabei handelt es sich nicht um eine abstrakte Klasse, sondern um eine Schnittstelle. Im Folgenden wird die Klasse SummingCollector vorgestellt, die diese Schnittstelle implementiert. Die Anwendung gestaltet sich dann recht kurz:

System.out.println("total: " + 
numbers.parallelStream().collect(new SummingCollector()));

Es sind Methoden zu definieren, die einen Ergebnis-Container sowie eine Akkumulator- und eine Kombinierfunktion liefern. Dies ist von der ersten collect-Variante her bekannt. Zusätzlich ist eine Methode erforderlich, die eine Funktion für das finale Ergebnis liefert. Das ist an dieser Stelle zu beachten: Die Methoden liefern nicht direkte Ergebnisse, sondern Funktionen, welche die Ergebnisse produzieren. Eine Funktion lässt sich am einfachsten mittels Lambda-Ausdruck realisieren.

Eine letzte Methode liefert schließlich noch diverse Charakteristiken, die dem Compiler Hinweise über die Nutzung des Collectors geben. Der Einfachheit halber nutzt der vorgestellte SummingCollector die existierende SummingUnit, an die er einzelne Aufgaben deligiert.

public class SummingCollector 
implements Collector<Long, SummingUnit, Long>{
@Override
public Supplier<SummingUnit> supplier() {
return () -> new SummingUnit();
}

@Override
public BiConsumer<SummingUnit, Long> accumulator() {
return (s, v) -> s.sum(v);
}

@Override
public BinaryOperator<SummingUnit> combiner() {
return (left, right) -> {left.combine(right); return left;};
}

@Override
public Function<SummingUnit, Long> finisher() {
return s -> s.getSum();
}

@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.UNORDERED);
// do not add ", Characteristics.CONCURRENT"!
}

Die Klasse implementiert das typisierte Interface Collector. Die drei Typen (T, A, R; siehe Methodensignatur oben) besagen: Verarbeite ein Objekt vom Typ Long (eine Zahl), nutze einen Erbebniscontainer vom Typ SummingUnit und produziere ein finales Ergebnis vom Typ long (die Summe).

supplier liefert eine neue SummingUnit. Das funktioniert aber nur, da in den Charakteristiken der Wert
Characteristics.CONCURRENT nicht enthalten ist. Wird er angegeben, betrachtet der Compiler den Collector per se als Multithreading-fähig und erzeugt nur eine SummingUnit. Wie erläutert, ist die Methode sum() jedoch nicht für Multithreading ausgelegt. So wie es implementiert ist, erhalten Entwickler die gewünschte SummingUnit je Thread. Die Funktion accumulator nimmt eine
SummingUnit sowie einen Wert entgegen. Hier wird dann einfach addiert.

combiner arbeitet auf zwei SummingUnits, hier als links und rechts bezeichnet. Anders als bei der ersten Variante wird nicht zu einem "führenden" Ergebniscontainer der zweite kombiniert. Vielmehr sind beide gleichberechtigt. Nach der combine-Operation ist ein Ergebniscontainer zurückzugeben. Im Beispiel ist das der linke. Je nach Aufgabenstellung können Entwickler aber auch einen völlig neuen Container erzeugen, der die kombinierten Werte enthält.

Hinzugekommen ist die Funktion finisher, die einen (den verbleibenden) Ergebniscontainer entgegennimmt und das finale Ergebnis liefert. Zu guter Letzt sehen Leser die Angabe der Charakteristik unordered, die dem Compiler mitteilt, dass die Reihenfolge bei der Verarbeitung der Stream-Objekte keine Rolle spielt. Das ermöglicht eine effektive Parallelisierung.

Fazit

Lambdas und Streams erlauben die einfache Parallelisierung ohne explizite Nutzung von Threads, Synchronisierung, Locks und sonstigen Dingen, die mindestens keinen Spaß machen und manche Entwickler gar davon abhalten, "multithreaded" zu programmieren. Wie das einfache Beispiel gezeigt hat, ist die Kenntniss dessen, was hinter den Kulissen abläuft, sehr hilfreich, um parallele Verabreitung mit Streams sinnvoll umzusetzen. Mit dem Wissen lassen sich deutlich komplexere Datensammler erstellen. (ane [5])

Michael Müller
ist als Bereichsleiter Softwareentwicklung der InEK GmbH verantwortlich für Projekte im Web-, Java- und .NET-Umfeld. Daneben betätigt er sich als freier Autor und verfasst Fachartikel zu diversen Entwicklungsthemen sowie Buchrezensionen.


URL dieses Artikels:
http://www.heise.de/-3304756

Links in diesem Artikel:
[1] https://www.heise.de/developer/artikel/Was-Entwickler-mit-Java-8-erwartet-1932997.html
[2] https://www.heise.de/developer/artikel/Streams-und-Collections-in-Java-8-2151376.html
[3] https://leanpub.com/lambdas-de
[4] https://docs.oracle.com/javase/tutorial/essential/concurrency/procthread.html
[5] mailto:ane@heise.de