Parallele Datensammlung mit Java 8

Race Conditions

Hier verarbeitet SumTask die Liste mit den Zahlen partiell. In der Hauptroutine wird SumTask dazu einmal mittels neuem Thread (therad.start), das zweite Mal direkt sumTask.run) aufgerufen. Vor der Ausgabe der Summe wartet das Programm mittels thread.join() auf die Beendigung des Threads, um sicherzustellen, dass die Berechnung auch abgeschlossen ist. Trotz der Maßnahme gibt das Programm bei mehrfacher Ausführung unterschiedliche Ergebnisse aus. Wie kann das sein?

Race Condition

Betrachtet man die Summierung:

sum += val;

so ist das, was nach einer einfachen Operation aussieht, in Wirklichkeit eine ganze Befehlsfolge. Es handelt sich um syntaktischen Zucker, denn die Anweisung lässt sich auch anders schreiben.

sum = sum + val;

Ein gewöhnlicher Computer kann nicht einfach zu einer Variable, die irgendwo im Speicher liegt, einen Wert addieren. Vielmehr erfolgt die Addition in einem Prozessorregister. Damit ergibt sich eine Anweisungsfolge, ähnlich folgendem Pseudocode:

register.load(sum)
register.add(val)
register.write(sum)

Der Wert der Variablen sum wird in ein Register geladen, der Wert von val addiert und das Ergebnis in der Variablen sum abgelegt. Diese drei prinzipiellen Operationen laufen nicht als atomare Einheit ab. Vielmehr lassen sich die Operationen der unterschiedlichen Threads ineinander schachteln, wobei jeder Thread ein eigenes Register nutzen kann. Betrachtet sei ein möglicher Ablauf zweier Threads. Dabei wird ein initialer Wert von sum=0 angenommen. Thread 1 verarbeitet val=5 und Thread 2 val=3. Jeder Thread hat sein eigenes Register.

Operation Ergebnis
Thread 1 register1.load(sum) register1 = 0
Thread 1 register1.add(5) register1 = 5
Thread 1 register1.write(sum) sum = 5
Thread 2 register2.load(sum) register2 = 5
Thread 2 register2.add(3) register2 = 8
Thread 2 register2.write(sum) sum = 8

Da beide Threads unabhängig von- und parallel zueinander werkeln, kann die Reihenfolge der Operationen auch anders erfolgen, beispielsweise so:

Operation Ergebnis
Thread 1 register1.load(sum) register1 = 0
Thread 1 register1.add(5) register1 = 5
Thread 2 register2.load(sum) register2 = 0 (sum ist noch 0!)
Thread 1 register1.write(sum) sum = 5
Thread 2 register2.add(3) register2 = 3
Thread 2 register2.write(sum) sum = 3

In diesem Beispiel wird register2 geladen, bevor der erste Thread sein Ergebnis zurückgeschrieben hat. Somit ist sum noch 0. Wie leicht zu sehen ist, enthält sum nach Ablauf beider Additionen den falschen Wert. Bei dem Problem handelt es sich um eine sogenannte Race Condition. Das Ergebnis ist davon abhängig, wer im Wettlauf um die Operation auf die gemeinsame Variable vorne liegt. Die Methode sum() wird so zu einer kritischen Sektion. Eine solche ist vor konkurrierendem Zugriff zu schützen. Java bietet hierfür verschiedene Mechanismen. Beispielsweise können Entwickler vor Eintritt in die kritische Sektion eine Sperre (lock) setzen oder sie überlassen das Java, indem sie die Methode mit dem Qualifier synchronized versehen.

public synchronized void sum(long val) 

Damit sperren sie die Methode automatisch für andere Threads, solange ein ebensolcher diese durchläuft. Praktisch werden damit die Aufrufe serialisiert, einer nach dem anderen, auf Kosten der Parallelität. Das kann nicht die gewünschte Lösung sein.

Java bietet das Schlüsselwort volatile, mit dem sich die Deklaration der Variablen sum versehen lässt. Dadurch sichert Java lediglich zu, dass nach dem Schreiben in diese Variable genau deren neuer Wert gelesen wird. Das hilft aber im beschriebenen Szenario nicht weiter. (Allerdings könnte ohne volatile auch der erste Ablauf fehlschlagen: Obwohl die 5 nach sum geschrieben wurde, könnte die folgende Operation aufgrund alten Cache-Inhalts noch eine 0 lesen.)

Kritische Sektionen vermeiden

Die gewünschte Lösung muss vielmehr darin liegen, kritische Sektionen wo immer möglich zu vermeiden. Im Beispiel lässt sich das dadurch erreichen, dass jeder Thread mit einer eigenen SummingUnit arbeitet. Damit gibt es keine gemeinsam genutzte Variable mehr. Stattdessen sind die Teilergebnisse der einzelnen SummingUnits am Ende zusammenzutragen – hier greift die combine()-Methode.

public static void main(String[] args) throws InterruptedException {
List<Long> numbers = Utils.createNumbers();
int size = numbers.size();
SummingUnit summingUnit = new SummingUnit();
SummingUnit summingUnit2 = new SummingUnit();

Thread thread = new Thread(new SumTask(numbers, 0,
size / 2, summingUnit));
thread.start();

SumTask sumTask = new SumTask(numbers, size / 2,
size, summingUnit2);
sumTask.run();
thread.join(); // wait for thread to complete

summingUnit.combine(summingUnit2);
System.out.println("Sum: " + summingUnit.getSum());
}