Verarbeiten großer verteilter Datenmengen mit Hadoop

Programmiermodell und Framework

Mit MapReduce sind oft zwei Dinge gemeint. Zum einen versteht man es als Programmiermodell und zum anderen bezeichnet man damit MapReduce-Frameworks. Letztere arbeiten nach dem MapReduce-Modell, unterscheiden sich aber durch die Wahl der Programmiersprache und in den Implementierungsdetails. Google hat MapReduce als Programmiermodell für die hochparallele Datenverarbeitung in Clustern ausgelegt. Es hat als Konzept seine Wurzeln in Lisp, das die beiden Funktionen map() und reduce() kennt, die auch als Namensgeber für MapReduce fungierten. Lisps map()-Funktion wird mit einer Liste von Eingabewerten und einer Funktion als Parameter aufgerufen, damit es übergebene Funktion auf jeden Wert der Liste anwendet.

Die Ergebnisse der Funktionsaufrufe lassen sich in Listenform als map()-Ergebnis zurückgegeben. reduce() arbeitet ähnlich zu map(), indem man es mit einer Funktion und einer Liste mit Eingabewerten aufruft und jeden Wert aus der Eingabeliste der Funktion übergibt. Im Gegensatz zu map() führt reduce() alle Ergebnisse zusammen und gibt das Ergebnis als skalare Werte zurück.

Das MapReduce-Programmiermodell lehnt sich daran an und unterteilt die Verarbeitung in eine Map- und eine nachfolgende Reduce-Phase, die verteilt auf allen Rechnern eines Clusters ausgeführt werden. Die konkrete Implementierung der Map- und der Reduce-Funktion gibt der Programmierer vor, dem alle Freiheiten offen stehen. Da MapReduce für die Umsetzung in Rechnerclustern ausgelegt wurde, ist von der Annahme auszugehen, dass Worker jede der beiden Phasen auf mehreren Knoten des Clusters parallel ausführen können.

Architektur von Hadoop (Abb. 1)

Hierfür instanziiert das darunter liegende Framework eine bestimmte Anzahl Worker mit der vorgegebenen Map-Funktion, teilt die zu verarbeitenden Daten in als Splits bezeichnete Teile auf und weist sie den Workern zu. Jeder Worker führt daraufhin die Map-Funktion auf den ihm zugewiesenen Splits aus. Die Eingabedaten lassen sich durch das Hadoop-Framework als Schlüssel-Wert-Paare aufbereiten und der Map-Funktion übergeben, die sie verarbeitet und dabei eine beliebige Menge von als Intermediate-Werte bezeichneten Ausgabepaaren erzeugen kann. Das folgende Listing stellt das als Pseudocode für das Zählen von Wortlängen in Textdokumenten dar.

;; lineno: Zeilennummer
;; line: Textzeile
map(Key lineno, Value line)
for each word w in line:
EmitIntermediatePair(w, length(w));

Das Framework empfängt die Intermediate-Werte und verteilt sie auf die die Reduce-Funktion ausführenden Worker, wobei das Framework sicherstellt, dass alle Intermediate-Paare mit dem gleichen Schlüsselwert zusammenfasst und an die gleiche Reducer-Instanz übergeben werden. Die Ausgabe der Reduce-Funktion stellt das Ergebnis der MapReduce-Anwendung dar. Ein zentraler Master, der die verfügbaren Worker einer der beiden Phasen zuordnet, koordiniert und überwacht die MapReduce-Anwendungen. Die Abbildung 1 stellt die Verarbeitung und die einzelnen Phasen dar.

Verarbeitungsschritte einer Hadoop-Mapreduce-Anwendung (Abb. 2)

Hadoops MapReduce-Framework arbeitet nach dem oben beschriebenen Modell. Die Anwendungen für Hadoop bezeichnet man als Jobs, für deren Verteilung im Cluster und anschließenden Verwaltung der als Master fungierende JobTracker zuständig ist. Die eigentliche MapReduce-Job-Ausführung übernimmt der TaskTracker, von dem je eine Instanz auf einem Clusterknoten läuft. Üblicherweise arbeiten auf einem Knoten immer eine TaskTracker- und eine DataNode-Instanz als Paar, die das Zusammenspiel von HDFS und MapReduce-Framework ermöglichen. Abbildung 2 gibt einen Überblick über die Architektur und Abbildung 3 stellt einen Beispielcluster mit fünf Knoten dar.

Aufbau eines Beispielclusters mit fünf Knoten (Abb. 3)

Zur Überwachung eines Hadoop-Clusters stellen der NameNode für das HDFS und der JobTracker für MapReduce eine spartanische Weboberfläche zur Verfügung, auf der die wichtigsten Parameter zur Verfügung stehen, etwa für HDFS-Angaben zur Speicherkapazität und ihre Auslastung sowie die Anzahl der aktiven Knoten beziehungsweise der ausgefallenen Knoten im Cluster. Die Oberfläche des JobTrackers beschränkt sich ebenfalls auf eine kurze Übersicht der laufenden und auf Ausführung wartenden Jobs, stellt aber für jeden einzelnen Job detaillierte Informationen zum Fortschritt insgesamt, den ausgeführten Tasks und der durch Hadoop verwendeten Parameter bereit. Letzteres stellt eine gute Informationsquelle für spätere Optimierungen dar. Wer einen tieferen Einblick in die inneren Vorgänge in einem Hadoop-Cluster gewinnen möchte, sollte sich die von Hadoop-Framework via log4j geschriebenen Logfiles ansehen, die für jede Serverinstanz des Clusters geschrieben werden.

Zwar hilft das gut, die internen Vorgänge zu verstehen, aber nicht, einen Cluster an sich zu überwachen. Daher unterstützt Hadoop die Monitoring-Lösung Ganglia, aber auch Nagios lässt sich in Verbindung mit Hadoops JMX-Support einsetzen.