Szenario 1: MapReduce
Die betrachtete Anwendung ist eine Dokumentenverwaltung, mit der Benutzer unter anderem Dokumente miteinander teilen und mit Tags versehen können. Jedes Dokument ist also mit einer Menge von Benutzern und einer Menge von Tags assoziiert. Eine mögliche Aufgabe wäre nun, für jeden Benutzer die Menge aller Tags zu berechnen, die sich an den für ihn sichtbaren Dokumenten befinden. Nützlich ist die Information zum Beispiel um jedem Benutzer eine Liste von Suchvorschlägen bestehend aus den Tags aller seiner Dokumente anzubieten.
Diese Aufgabenstellung ist ein typischer Anwendungsfall für MapReduce. Ein MapReduce-Job iteriert zunächst über die Eingabedaten (alle Dokumente) und extrahiert bestimmte Daten (deren Benutzer und Tags). Anschließend reduziert er diese zu einem Ergebnis (eine Tag-Menge pro Benutzer).
Für den ThreadPoolExecutor wird ein Threadpool fester Größe mit numThreads Worker Threads verwendet. Die Menge der Dokumente wird in numThreads gleich große Teilmengen aufgespalten, die als Tasks zur Bearbeitung einzuplanen sind. Jeder Task führt einen Teil des MapReduce-Jobs für seine Dokumente durch. Sobald der ThreadPoolExecutor alle Tasks fertiggestellt hat, werden deren Ergebnisse in einem abschließenden Schritt miteinander kombiniert.
Der ForkJoinPool wird ebenfalls mit numThreads Worker Threads initialisiert. numThreads beschreibt hier die ungefähre Anzahl an Worker Threads, die aktiv an Tasks arbeiten. Das zu beachten ist wichtig, da ein Thread gegebenenfalls erst auf die Ergebnisse abgezweigter Tasks warten muss, bevor er die ursprüngliche Task abschließen kann. Der Thread versucht dann zwar zunächst, ausgewählte Tasks von anderen Threads auszuführen, statt einfach nur zu warten. Sollte das allerdings nicht möglich sein, wird er in den Ruhezustand versetzt. In solchen Situationen behält sich der ForkJoinPool vor, den Ausfall durch einen zusätzlichen Worker Thread zu kompensieren. Daher lässt sich die Thread-Anzahl beim ForkJoinPool nur näherungsweise kontrollieren.
Um die Formulierung von MapReduce-Jobs für den ForkJoinPool allgemein halten zu können, lässt sich im vorliegenden Beispiel eine Abstraktion durch die Definition von zwei Interfaces erreichen:
public interface Input<T> {
boolean shouldBeComputedDirectly();
Output<T> computeDirectly();
List<MapReduceTask<T>> split();
}
public interface Output<T> {
Output<T> reduce(Output<T> other);
T getResult();
}
Der Typparameter T steht für den Ausgabetyp des gesamten MapReduce-Jobs. Input<T> stellt die Eingabedaten für einen Task dar und kann Auskunft darüber geben, ob der Task weiter zu zerlegen oder direkt zu berechnen ist (shouldBeComputedDirectly()). Entsprechend kann Input<T> sich direkt berechnen (computeDirectly()) oder in neue Tasks zerlegen lassen (split()). Output<T> repräsentiert das Ergebnis eines Tasks und unterstützt, neben dem Zusammenführen mehrerer solcher Ergebnisse zu einem gemeinsamen (reduce()), das Abfragen des Gesamtergebnisses (getResult()).
Mit den beiden Interfaces lässt sich eine Klasse MapReduceTask<T> definieren. Da sie ein Ergebnis zurückliefern soll, ist sie von RecursiveTask abgeleitet.
public class MapReduceTask<T> extends
RecursiveTask<Output<T>> {
private final Input<T> input;
public MapReduceTask(Input<T> input) {
this.input = input;
}
@Override
protected Output<T> compute() {
if (input.shouldBeComputedDirectly()) {
return input.computeDirectly();
}
List<MapReduceTask<T>> subTasks = input.split();
for (int i = 1; i < subTasks.size(); i++) {
subTasks.get(i).fork();
}
Output<T> result = subTasks.get(0).compute();
for (int i = 1; i < subTasks.size(); i++) {
result = result.reduce(subTasks.get(i).join());
}
return result;
}
}
Sobald ein Worker Thread ein Task ausführt, wird die compute()-Methode aufgerufen. Während der Abarbeitung wird zunächst überprüft, ob der Task direkt zu berechnen ist. Andernfalls ist die Eingabe in neue Tasks aufzuteilen. Alle bis auf einen werden mit fork() in die lokale Task Queue eingeplant, wo sie anderen Threads zum Bearbeiten zur Verfügung stehen. Der verbleibende Task wird durch den Aufruf von compute() ausgeführt (was zu einem weiteren Zerlegen führen kann). Abschließend wartet das Programm mit join() auf die Ergebnisse der abgezweigten Tasks, die es zusammenführt und als Gesamtes zurückgibt.
Die eigentliche "Business-Logik" (die Berechnung der Tags) befindet sich in den Implementierungen von Input<T> und Output<T>, für die auf das GitHub-Repository verwiesen sei.
Ab sofort kann man sich mit Vorträgen für die neue Konferenz zu Agile ALM, Continuous Delivery und DevOps bewerben.



