Programmieren mit Tasks und parallelen Schleifen

Datenparallelität

Anzeige

Tasks erlauben es, unterschiedliche Funktionen parallel auszuführen. Eine andere Quelle der Parallelität ist die Ausführung einer Funktion auf einer Menge gleichartiger Daten durch Schleifen. Parallele Schleifen folgen dem SIMD-Prinzip (Single Instruction, Multiple Data) und bilden die Grundlage für die Verarbeitung regulärer Datenstrukturen wie Felder. Ein Paradebeispiel dafür sind Algorithmen aus der Bildverarbeitung, bei denen sich alle Pixel eines Bilds im Prinzip parallel verarbeiten lassen.

Sofern zwischen den Iterationen einer Schleife keine Abhängigkeiten bestehen, könnte man für jede Iteration einen Task erzeugen, der den Schleifenrumpf ausführt. In der Praxis ist das jedoch ineffizient, wenn das Feld groß ist und eine Iteration nur wenig Zeit in Anspruch nimmt. Wesentlicher effizienter ist es, wie beim Quicksort-Beispiel, die Datenstruktur in Blöcke zu unterteilen und diese jeweils sequenziell zu verarbeiten. Das bedeutet, statt für jede Iteration nur für jeden Block einen eigenen Task zu erzeugen. Glücklicherweise muss man sich um die Partitionierung in der Regel nicht kümmern; die meisten Sprachen und Bibliotheken bieten fertige Konstrukte für parallele Schleifen an. In TBB gibt es das Funktions-Template parallel_for, das neben dem Iterationsbereich den Schleifenrumpf in Form einer Lambda-Funktion oder eines Funktionsobjekts entgegennimmt. Damit lässt sich zum Beispiel das Inkrementieren der Elemente eines Vektors leicht bewerkstelligen:

std::vector<int> v;
// ... Initialisierung
tbb::parallel_for(size_t(0), v.size(), [&v]
(size_t i) {vv[i]++;});

Optional kann man parallel_for eine Schrittweite übergeben. Für komplexere Schleifen, insbesondere für solche über mehrdimensionalen Bereichen, stellt TBB Varianten von parallel_for zur Verfügung, die es erlauben, auf die Partitionierung Einfluss zu nehmen. Zudem gibt es mit parallel_do ein Funktions-Template, das es erlaubt, der zu verarbeitenden Sequenz dynamisch weitere Elemente hinzuzufügen.

Java 7 bietet noch keine direkte Unterstützung für Datenparallelität. Der Grund dafür ist vor allem, dass Lambda-Funktionen noch nicht Teil der Sprache sind. Datenparallele Operationen lassen sich damit wesentlich besser handhaben, was die Implementierungen in .NET und TBB zeigen. Es sind allerdings parallele Array-Operationen in Planung.

Parallele Schleifen in C# sehen wiederum ähnlich zu denen in TBB aus:

int[] v; 
// ... Initialisierung
Parallel.For(0, v.Length, i => {v[i]++;});

Die zweite Form von parallelen Schleifen in C# dient der parallelen Verarbeitung von Aufzählungen (IEnumerable) mit Parallel.ForEach. Die obige Schleife würde damit wie folgt aussehen:

Parallel.ForEach(Enumerable.Range(0, v.Length), i 
=> {v[i]++;});

Ein Vorteil von Parallel.ForEach ist, dass die Daten zu Beginn der Verarbeitung nicht vollständig vorliegen müssen. Eine Datenstruktur mit IEnumerable-Schnittstelle lässt sich zum Beispiel durch einen Thread füllen und durch einen anderen Thread abarbeiten. Daher ist auch die Partitionierungsstrategie anders als bei Parallel.For. Es wird zunächst mit kleinen Blöcken begonnen, um schnell möglichst vielen Kernen Arbeit zu verschaffen. Dann wird die Blockgröße sukzessive erhöht. Bei Parallel.ForEach lassen sich zudem eigene oder vorgefertigte Strategien für die Partitionierung des Iterationsbereichs angeben.

Eine Falle, die man beim Umgang mit parallelen Schleifen tunlichst meiden sollte, betrifft die Initialisierung der Datenstrukturen. Bei Rechnern mit verteiltem, gemeinsamem Speicher, sogenannten NUMA-Systemen (Non-Uniform Memory Access), steht zum Zeitpunkt der Allokation eines Speicherbereichs die Abbildung der virtuellen auf die physikalischen Speicherseiten in der Regel noch nicht fest. Bei den meisten Betriebssystemen kommt hierfür die Strategie des First-touch Placement zum Einsatz, bei der eine Seite im Speicher des Prozessors landet, der als Erster darauf zugreift. Auf diese Weise soll eine Verteilung der Speicherbereiche unter Beibehaltung einer möglichst hohen Datenlokalität erreicht werden. Initialisiert man nun eine große Datenstruktur sequenziell, landen die Speicherseiten mit hoher Wahrscheinlichkeit in nur einem physikalischen Speicher, der bei späteren parallelen Zugriffen einen Flaschenhals bildet. Deshalb sollte die Initialisierung parallel erfolgen.

Auch Schleifen mit Datenabhängigkeiten zwischen den Iterationen lassen sich in vielen Fällen parallelisieren. Eine häufig anzutreffende Abhängigkeit ist die Akkumulation von Werten in einer gemeinsamen Variable. Solche Berechnungen bezeichnet man als Reduktionen oder Aggregationen. Betrachtet sei zunächst die sequenzielle Berechnung der Summe der Elemente eines Vektors in C++:

std::vector<double> v;
// ... Initialisierung
double sum = 0;
for(double x : v)
sum += x;

Ein naheliegender Ansatz für die Parallelisierung dieser Schleife ist, die Zugriffe auf sum zu synchronisieren, sodass sie sich mit parallel_for oder vergleichbaren Funktionen ausführen lässt. Dieser Ansatz ist jedoch alles andere als optimal, da Zugriffe auf gemeinsame Variablen zwangsläufig ein Nadelöhr sind. Eine signifikante Beschleunigung ist deshalb nicht zu erwarten. Wesentlich besser ist es, blockweise Teilsummen zu berechnen und diese dann aufzuaddieren.

TBB bietet für Reduktionen das Funktions-Template parallel_reduce an, das nicht auf die Addition von Werten beschränkt, sondern universell einsetzbar ist. Deshalb erfordert die Summenberechnung etwas Schreibarbeit (siehe auch das folgende Beispiel). Als ersten Parameter übergibt man parallel_reduce den zu reduzierenden Bereich in Form eines Objekts vom Typ blocked_range. Darauf folgt der Wert 0.0, der als Startwert für die Summenberechnung verwendet wird. Der dritte Parameter ist eine Lambda-Funktion, die für einen gegebenen Block die Teilsumme berechnet. Als letzten Parameter übergibt man ebenfalls eine Lambda-Funktion, die für jeweils zwei Blöcke die Teilsummen aufaddiert. Auf die gleiche Weise kann man Produkte, Minima, Maxima etc. berechnen. Einzige Voraussetzung ist, dass die zugrunde liegende Operation assoziativ im mathematischen Sinne ist. Aufgrund von Rundungsfehlern bei Gleitkommazahlen ist das Ergebnis einer parallelen Reduktion jedoch nicht unbedingt identisch mit dem eines sequenziellen Algorithmus.

double SumVectorElements(const std::vector<double>& v) {
return tbb::parallel_reduce(
// zu reduzierender Bereich
tbb::blocked_range<size_t>(size_t(0), v.size()),
// Startwert (neutrales Element der Addition)
0.0,
// Berechnung der Teilsumme eines Blocks
[&v] (const tbb::blocked_range<size_t>& range, double x) -> double {
for(size_t i = range.begin(); i != range.end(); i++)
x += v[i];
return x;
},
// Aufaddieren der Teilsummen zweier Blöcke
[] (double x, double y) {return x + y;});
}

Sowohl Parallel.For als auch Parallel.ForEach in C# unterstützen Thread-lokalen Speicher für Zwischenergebnisse, was die parallele Aggregation vereinfacht. Das folgende Listing zeigt die Summenberechnung mit der generischen Variante von Parallel.For. Der angegebene Typ (double) ist der Typ der Thread-lokalen Daten. Es folgen der Iterationsbereich und eine Funktion, die die Thread-lokalen Daten initialisiert. Das nächste Argument ist eine Funktion, die die eigentliche Arbeit verrichtet. Sie bekommt von der Parallel.For-Implementierung eine Thread-lokale Variable übergeben, in der sie die jeweilige Teilsumme speichert. Eine Synchronisation ist nicht erforderlich, da per Definition nur ein Thread die Variable verwendet. Das letzte Argument umfasst den Aggregationsschritt, den am Schluss jeder beteiligte Thread ausführt. Erst hier wird durch Einsatz einer Sperre die Gesamtsumme gebildet. Da das aber nur einmal pro Thread und nicht einmal pro Task passiert, ist der Aufwand für die Synchronisation vernachlässigbar.

static double SumArrayElements(double[] a) {
double sum = 0;
object sumLock = new object();
Parallel.For<double>(
0, // von (inklusive)
a.Length, // bis (exklusive)
() => 0, // Initialisierung der Thread-lokalen Teilsummen
// Schleifenrumpf für die Berechnung der Teilsummen:
(i, loopState, threadLocalSum) => {
return threadLocalSum + a[i];
},
// Aggregationsfunktion für die Berechnung der Gesamtsumme:
partialSum => {
lock (sumLock) sum += partialSum;
}
);
return sum;
}

Die Implementierung leidet jedoch an der geringen Anzahl der im Schleifenrumpf ausgeführten Befehle. Allein dass in jeder Iteration ein Funktionsaufruf stattfindet, kostet viel Zeit. Dem lässt sich durch das Verwenden eines Partitioners entgegenwirken. Das nächste Beispiel zeigt das Endergebnis im "Vollausbau". Anstelle des Iterationsbereichs übergibt man einen Partitioner, und im Schleifenrumpf verarbeitet man mit einer sequenziellen for-Schleife jeweils einen ganzen Block.

static double SumArrayElementsPartitioner(double[] a) {
double sum = 0;
object sumLock = new object();
Parallel.ForEach<Tuple<int, int>, double>(
Partitioner.Create(0, a.Length),
() => 0,
(range, loopState, threadLocalSum) => {
double rangeSum = 0;
for (int i = range.Item1; i < range.Item2; i++) {
rangeSum += a[i];
}
return threadLocalSum + rangeSum;
},
partialSum => {
lock (sumLock) sum += partialSum;
}
);
return sum;
}
Anzeige