Parallele Datensammlung mit Java 8

Zu den großen Neuerungen von Java 8 gehörten Lambda-Ausdrücke und das Stream-Interface. Das aktuelle Java ist zwar schon eine Weile verfügbar, doch noch immer sind viele Unternehmen dabei, darauf umzustellen. Gut, wer dann mit parallelen Collections umgehen kann.

Sprachen  –  3 Kommentare
Parallele Datensammlumg mit Java 8

Einführende Artikel zu Lambdas und Streams hat es auf heise Developer schon gegeben. Insofern werden in diesem Artikel die Konzepte nur kurz aufgefrischt, um über eine kurze Betrachtung der Java-Nebenläufigkeit den Fokus auf spezielle Aspekte wie parallele Collections zu legen.

Basics zur Parallelität in Java 8

Ein Stream ist als kontinuierlicher Fluss von Daten vorstellbar, ähnlich einem InputStream. Die Daten werden von einer Quelle, zum Beispiel einer Collection, in Umlauf gebracht. Ein Datum ist jedoch nicht einfach ein Byte, sondern kann ein beliebiges Objekt darstellen. Im Falle einer Collection als Datenquelle entspricht ein Datum des Streams dem der Collection.

Prinzipieller Aufbau eines Streams (Abb. 1)

Ein ParallelStream ist als paralleler Strom von Objekten vorstellbar. Für die Aufteilung und Anzahl der (parallelen) Ströme ist der sogenannte Spliterator (Splitting Iterator) zuständig, der entscheidet, ob die Daten in kleinere, parallel zu verarbeitende Einheiten aufgeteilt werden, und über die Daten einer solchen Einheit iteriert. Unter der Haube wird dabei das Fork-Join-Framework von Java genutzt.

Prinzipieller Aufbau eines parallelen Streams (Abb. 2)

Am Ende des Streams steht eine terminale Operation. Sie führt die parallelen Ströme zu einem Ergebnis zusammen. In den Stream lassen sich neben der terminalen beliebige intermediäre Operationen einfügen, die in einer Kette durchgeführt werden. Insofern gibt es aus Entwicklersicht nur Eingangs- und Ausgangsdaten, aber keine expliziten Zwischenergebnisse.

ParallelStream mit intermediärer und terminaler Operation (Abb. 3)

Abbildung 3 zeigt beispielhaft einen parallelen Stream mit Filter- und Map-Operation sowie einem List Collector als terminaler Operation. Ein solcher Stream verhält sich "lazy": Er lässt sich mit seinen intermediären Operationen als Prozesskette aufbauen, ohne dass tatsächlich Operationen ausgeführt werden. Erst mit Aufruf der terminalen Operation wird der Stream gestartet. Für Collections ergibt sich damit der folgende prinzipielle Aufbau:

AnyCollection
.stream()
[.intermediateOperation1()
[.intermediateOperation2()
[....]]]
.terminalOperation();

Wie die eckigen Klammern andeuten, können Entwickler keine bis beliebig viele intermediäre Operationen angeben. Zur Ausführung reicht jedoch eine terminale Operation.

Streams sind nicht nur auf Collections definiert, sondern auch in anderen Interfaces oder Klassen. Sie erweitern als Default-Methoden existierende Interfaces und können mit unterschiedlichen Namen daherkommen, zum Beispiel als .intStream() oder als .lines().

Im folgenden Beispiel erzeugt createNumbers() einfach eine Liste von Zahlen. Aus ihr sollen nun die Vorkommen aller Werte kleiner 10 gezählt werden. Hierzu nutzt man als intermediäre Operation einen Filter, der nur Zahlen kleiner 10 passieren lässt. Die Methode filter() arbeitet auf dem Interface Predicate, das eine abstrakte Methode test(Object o) definiert. Hier wird jeweils ein Element aus dem Stream übergeben.

public class SimpleStream {
public static void main(String[] args) {
List<Integer> numbers = createNumbers();
System.out.println("Count < 10: " +
numbers.stream().filter(new LessThan10()).count());
}

private static class LessThan10 implements Predicate{
@Override
public boolean test(Object n) {
return (int)n < 10;
}
}
}

Doch erscheint das Ganze recht umständlich. Zwar lässt sich mit dem Stream die Schleife verstecken und somit die Bearbeitung linearisieren, doch stört die aufwendige Definition der Prüfklasse. An der Stelle kommen nun die Lambda-Ausdrücke ins Spiel. Sie lassen sich an Stellen funktionaler Interfaces nutzen. Als solche wird eine Schnittstelle mit genau einer abstrakten Methode bezeichnet.

public class SimpleStream {
public static void main(String[] args) {
List<Integer> numbers = createNumbers();
System.out.println("Count < 10: " +
numbers.stream().filter(n -> n < 10).count());
}
}

n ist hierbei ein frei gewählter Bezeichner, der für ein im Stream vorkommendes Objekt steht. Dieses n wird nun als Parameter in die Funktion hineingegeben. Die zu überschreibende Methode test(Object n) gibt einen booleschen Wert zurück, und zwar mittels n < 10. Da der Compiler an der Stelle den Typ des Objekts anhand der Collection ermitteln kann, sind weder eine Typangabe für den Parameter n noch ein explizites Casting erforderlich.

Der Lambda-Operator -> mag ein wenig an Mathematik erinnern. Hier heißt es x ->f(x): Ein Parameter x wird auf eine Funktion f(x) abgebildet. Wer JavaScript kennt, dem ist obiger Ausdruck vielleicht als function(n) n < 10 bekannt.

Für Parameter in Lambda-Ausdrücken gelten unter anderem folgende Regeln:

  • Typ + Name wie Parameter in Klammern setzen: (int x, int y) -> x * y;
  • Eindeutig ableitbare Typen können entfallen: (x, y) -> x * y;
  • Eine leere Parameterliste ist ebenfalls möglich: () -> getVendorCount(persons)
  • Ein Parameter ohne Typangabe ohne Klammerung möglich: (x) -> x * x; oder x -> x * x;

Soweit eine kurze Auffrischung der Grundlagen zu Lambdas und Streams. Neben den genannten Artikeln sind die Grundlagen in einem E-Book des Autors ausführlich beschrieben.

Parallele Datensammler

In diesem Artikel geht es um die parallele Datensammlung. Um das Beispiel einfach zu halten, werden Zahlen nur summiert. Die gezeigten Fallstricke und Lösungen lassen sich dann auf andere Aufgabenstellungen übertragen. Als terminale Operation kommt die Methode collect zum Einsatz:

long sum = numbers.parallelStream().collect(...);

Sie dient dem Sammeln von Daten in einem Behälter. Während er bestehen bleibt, ändert sich sein Inhalt. Man hat es mit einer veränderlichen Datenstruktur zu tun. Im Gegensatz dazu arbeitet das im Artikel nicht behandelte reduce mit unveränderlichen Datenstrukturen.

collect ist eine überladene Methode, die mit zwei unterschiedlichen Signaturen definiert ist. Sie erwartet drei Parameter: supplier, accumulator und combiner. Was es mit ihnen auf sich hat, erfahren Leser nach einem kurzen Exkurs in die Grundlagen der Java-Concurrency.