Parallele Datensammlung mit Java 8

Exkurs Concurrency

Ausflugsziel Nebenläufigkeit

Concurrency wird im Deutschen als Nebenläufigkeit bezeichnet. Das drückt aus, dass mehrere Programmstränge (Threads) nebeneinander – aber nicht unbedingt gleichzeitig – ausgeführt werden. Klassisches Beispiel ist die Verlagerung rechenintensiver Aufgaben in den Hintergrund, damit das User Interface nicht blockiert.

Von Parallelität spricht man, wenn die nebenläufigen Programmstränge gleichzeitig die gleichen Algorithmen ausführen. Dabei gehen die Meinungen auseinander, ob nun Parallelität einen Spezialfall der Nebenläufigkeit oder eine andere Form der Programmausführung darstellt. Das spielt für die Betrachtung hier jedoch keine Rolle. Im Rahmen der parallelen Streams werden gleiche Algorithmen gleichzeitig auf unterschiedliche Daten des Streams angewendet. In Abbildung 3 ist das als vertikaler Schnitt durch die Datenströme erkennbar, bei dem immer dieselbe Operation in einer Linie angeordnet ist.

Java wurde von Anfang an auf Multithreading ausgelegt. So existiert die Klasse Thread bereits seit dem JDK 1.0.

Unterscheidung Prozess – Thread – Programm

  • Ein Prozess verfügt über eine komplette Ablaufumgebung mit eigenem, meist vor anderen Prozessen geschützten Speicher.
  • Ein Thread ist ein Ablaufstrang innerhalb des Prozesses.
  • In einem Prozess existiert mindestens ein Thread, bei Concurrency sind es mehrere.
  • Ein Programm wird durch einen oder mehrere Prozesse ausgeführt.

Siehe auch die Java-SE-Dokumentation.

public static void main(String[] args) {
Thread thread = new MyThread();
thread.start();
System.out.println("Message from Main");
}

private static class MyThread extends Thread {
@Override
public void run() {
System.out.println("Message from MyThread");
}
}

Ein Thread lässt sich einfach als Klasse implementieren, die von Thread erbt. Sie muss lediglich die Methode run() überschreiben. Im aufrufenden Programm wird eine neue Instanz der Klasse angelegt und mittels start() ausgeführt. start() sorgt hinter den Kulissen dafür, dass ein neuer Thread erzeugt wird, und ruft dann in ihm die Methode run() auf.

Die Nutzung von Thread erscheint auf den ersten Blick intuitiv. Dass die eigene Klasse von Thread erben muss, erweist sich jedoch nicht als hilfreich, schließlich möchte man in der Regel bei Klassenhierarchien fachliche Abhängigkeiten abbilden. Insofern ist Thread als Oberklasse nicht unbedingt nutzbar; es geht auch anders: Statt von Thread zu erben, implementiert die Klasse einfach das Interface Runnable. Die Klasse Thread verfährt intern übrigens nicht anders.

public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.start();
System.out.println("Message from Main");
}

private static class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("Message from MyRunnable");
}
}

Zum Erzeugen eines Threads wird die eine Instanz der eigenen Klasse in eine neue von Thread verpackt. Ansonsten ändert sich nichts.

Bevor die Leser zur Lösung mit parallelen Streams zurückkommen, sei die Addition von Zahlen mittels Threading betrachtet. Für die Summierung wird die Klasse SummingUnit genutzt.

private class SummingUnit {

public SummingUnit() {
System.out.println("ctor SummingUnit");
}

private long sum = 0;

public void sum(long val) {
sum += val;
}

public long getSum() {
return sum;
}

public void combine(SummingUnit other) {
sum += other.sum;
}
}

Die Klasse enthält intern eine Summenvariable, welche die an sum() übergebenen Werte aufsummiert, sowie einen Getter, um die Summe auszulesen. Die Methode combine() wird zunächst nicht benötigt. Aufmerksame Leser mögen hier jedoch einen Bezug zum dritten Parameter von collect() erkennen.

public static void main(String[] args) throws InterruptedException {
List<Long> numbers = Utils.createNumbers();
int size = numbers.size();
SummingUnit summingUnit = new SummingUnit();

Thread thread = new Thread(new SumTask(numbers, 0,
size / 2, summingUnit));
thread.start();

SumTask sumTask = new SumTask(numbers, size / 2,
size, summingUnit);
sumTask.run();
thread.join(); // wait for thread to complete

System.out.println("Sum: " + summingUnit.getSum());
}


private static class SumTask implements Runnable {

private final List<Integer> _numbers;
private final int _start;
private final int _end;
private final SummingUnit _summingUnit;

public SumTask(List<Integer> numbers,
int start,
int end,
SummingUnit summingUnit) {
_numbers = numbers;
_start = start;
_end = end;
_summingUnit = summingUnit;
}

@Override
public void run() {
for (int i = _start; i < _end; i++) {
_summingUnit.sum(_numbers.get(i));
}
}
}