Verarbeiten großer verteilter Datenmengen mit Hadoop, ein Beispiel

Werkzeuge  –  Kommentare

Standen bei einem früheren Artikel das MapReduce-Programmiermodell sowie die Apache-Implementierung Hadoop im Zentrum, führt heise Developer nun in die Programmierung von MapReduce-Anwendungen mit dem Framework ein. Beispielhaft sei ein MapReduce-Job entwickelt, auf dessen Grundlage der Leser erste eigene Jobs umsetzen kann.

Das Hadoop-Framework übernimmt einen Großteil der für einen MapReduce-Job benötigten Arbeiten, sodass sich der Entwickler in den meisten Fällen auf die Implementierung der Map- und der Reduce-Funktion konzentrieren kann. Die Abbildung 1 stellt das Zusammenspiel zwischen dem Entwickler und dem Hadoop-Framework dar. Im ersten Schritt teilt der Entwickler dem Framework mit, wo es im Hadoop Distributed File System (HDFS) die zu verarbeitenden Daten findet und mit welchem Eingabeformat es diese einlesen soll (Punkt 1).

Zusammenspiel zwischen Entwickler und Hadoop (Abb. 1)

Zu beachten ist an diesem Punkt, dass Hadoop jeden Block einer Datei komplett in den Speicher einliest, unabhängig davon, wie viele der enthaltenen Datensätze es verarbeiten soll. Dabei sei eine Prämisse vorangestellt: Durch die wesentlich höhere Geschwindigkeit des Hauptspeichers im Vergleich zur Lesegeschwindigkeit von Festplatten ist es effizienter, einen ganzen Block in den Hauptspeicher einzulesen und nicht verwendete Daten einfach zu verwerfen, statt die benötigten Blockabschnitte auf der Festplatte zu suchen. Das ist der Grund für das Fehlen einer fseek()-ähnlichen Funktion, wodurch unter anderem das Einfügen von Daten in bestehende Dateien nicht möglich ist. Da man somit keinen Random Access (Direktzugriff) auf Daten realisieren kann, muss Hadoop alle Daten in der bereitgestellten Reihenfolge verarbeiten. Das Verfahren bezeichnet man als Streaming Access.

Hadoop stellt Entwicklern fertige Klassen zur Verfügung, die das Einlesen der Daten aus dem HDFS und ihre Aufbereitung in die für die Map-Funktion benötigten Key-Value-Paare abnehmen. Lediglich den Pfad im HDFS zu den zu verarbeitenden Dateien muss der Anwender angeben, damit das Framework die zu lesenden Dateien bestimmen kann. Sind sie festgelegt, ermittelt das Framework die zu lesenden Blöcke auf den DataNodes im Cluster und legt fest, wie viele Map-Tasks durch TaskTracker auf welchen Knoten auszuführen sind. Damit folgt Hadoop dem Lokalitätsbestreben von MapReduce, die Berechnung immer bei den Daten auszuführen. Mit dem gewählten Eingabeformat teilt das Framework die Blöcke in Splits auf und erzeugt die Schlüssel-Wert-Paare für den Mapper. Hadoop startet mit diesen Informationen und der gegebenen Mapper Implementierung die TaskTracker für die Map-Phase (Punkte 3 und 4).

Da die Ausgabe der Mapper gleichzeitig die Eingabe für die Reduce-Phase darstellt, muss Hadoop als Nächstes für jedes ausgegebene Paar entscheiden, an welchen der Reducer es das Paar schicken soll. Hadoops Standardimplementierung stellt über den Hash-Wert des Schlüssels der erzeugten Schlüssel-Wert-Paare sicher, dass alle Werte mit dem gleichen Wert an dieselbe Reducer-Instanz gehen. Ein abweichendes Verhalten lässt sich über eine eigene Implementierung des Interfaces Partitioner erreichen (Punkte 5 und 6).

Der Entwickler kann neben der Implementierung die Anzahl zulässiger Reducer-Instanzen bestimmen (Punkt 7). Da die Reducer die endgültige Ausgabe des Jobs erzeugen und im HDFS ablegen, bedeutet jede Instanz eine Ausgabedatei mit einem Teilergebnis des Jobs. Möchte man das Ergebnis nur in einer Datei ausgeben, lässt sich das am einfachsten durch die Beschränkung auf einen Reducer erreichen. Andere Wege sind durch zusätzliche Verarbeitungsschritte ebenfalls denkbar. Analog zum Eingabeformat muss der Hadoop-Entwickler Ausgabeformat und -ort im HDFS angeben.

Seit Hadoop 0.20.1 gibt es eine neue API für MapReduce-Anwendungen, sodass zuerst zu entscheiden ist, ob man mit der alten oder der neuen API programmieren möchte. Bei Neuentwicklungen sollte man der neuen den Vorrang geben, wenn sich auch noch die meisten Beispiele im Internet auf die alte API beziehen. Die Beispiele in diesem Artikel verwenden nur die neue API.

Die Hauptaufgabe für die Entwicklung einer MapReduce-Anwendung besteht in der Umsetzung der Mapper- und der Reducer-Funktion. Hierfür stellt Hadoop in der aktuellen Version die generischen Klassen Mapper und Reducer in einer Default-Implementierung als Identitätsfunktion zur Verfügung, wie sie Listing 1 und 2 auszugsweise zeigen. Eigene Implementierungen sind von den Klassen abzuleiten.

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
protected void map(KEYIN key, VALUEIN value,
Context context)
throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
protected void reduce(KEYIN key, Iterable<VALUEIN> values,
Context context)
throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
}

Nach der Umsetzung von Mapper und Reducer muss die Konfiguration des MapReduce-Jobs erfolgen, wofür das Framework zwei zentrale Klassen bereitstellt: Job und Configuration. Über Letztere kann der Entwickler jobspezifische Parameter an die Mapper- und Reducer-Implementierung übergeben oder Defaultwerte des Hadoop-Frameworks verändern. Mit der Klasse Job teilt der Entwickler dem Framework mit, welche Implementierung es für die einzelnen Teilaufgaben des Jobs verwenden werden soll.

Listing 6 zeigt die Konfiguration des Beispiel-Jobs. Zu beachten ist, dass man dem Job immer Klassen und keine konkreten Objekte übergibt, da die konkreten Instanzen erst zur Ausführungszeit auf den unterschiedlichen Knoten instanziiert werden.