Verarbeiten großer verteilter Datenmengen mit Hadoop

Hadoop Distributed File System

HDFS folgt dem Vorbild des Google File System (GFS). Daher stimmt es in wichtigen Eigenschaften, wie sie das von Google veröffentlichte Paper "The Google Filesystem" beschreibt, überein. Google ging bei der Entwicklung des GFS von der Annahme aus, dass in Rechnerclustern der Ausfall einzelner Knoten keine Ausnahme, sondern der Normalfall darstellt, wenn für den Clusteraufbau auf teure Spezial- zugunsten preiswerterer Standardhardware verzichten wird. Daher war Fehlertoleranz gegenüber Hardwareausfällen ein Hauptdesignziel für GFS. Daneben war es für Google notwendig, über ein verteiltes Dateisystem zu verfügen, das dem hohen Datenvolumen gerecht wird und sich dafür leicht durch das Hinzufügen neuer Knoten zum Cluster erweitern lässt.

Im Rahmen des Master-Slave-Prinzips übernimmt die Rolle des Masters beim HDFS der NameNode, der alle Metadaten des Filesystems wie Verzeichnisstrukturen und Dateien verwaltet. Die sogenannten DataNodes übernehmen die Rolle der Slaves und sind für die Verwaltung der Nutzdaten im HDFS auf den einzelnen Clusterknoten zuständig.

Für einen Benutzer sieht das HDFS wie ein normales hierarchisches Dateisystem aus, es ist aber von seinen Funktionen her nicht mit anderen verteilten Dateisystemen wie dem Andrew File System vergleichbar, da es für die Entwicklung von MapReduce-Anwendungen optimiert ist. Genauso wie das GFS geht HDFS davon aus, dass Dateien nur einmal geschrieben, dafür aber häufig gelesen werden, sodass Operationen zum Lesen effizienter sein müssen als die zum Schreiben. Eine weitere Besonderheit stellt das Fehlen von Operationen zum bytegenauen Zugriff auf Dateien dar, wie sie unter anderem von der C-Funktion fseek() her bekannt sind.

Mit zunehmender Größe des Clusters wächst auch die Wahrscheinlichkeit eines Hardwareausfalles. Das sollte in GFS/HDFS-Cluster unter normalen Umständen nicht zu Datenverlust führen. Daher darf keine Datei nur auf einem Knoten liegen, dessen alleiniger Ausfall zu Datenverlust führen würde. Die hierfür benötigte Datensicherheit erreicht das HDFS durch Replikation aller Nutzdaten.

Statt jedoch ganze Dateien auf unterschiedliche Knoten zu replizieren, wählten die Entwickler des GFS/HDFS einen blockorientierten Ansatz. Mit dem Ansatz wird jede im HDFS abgelegte Datei in einzelne Blöcke mit einer festen Bytegröße aufgeteilt und durch den NameNode auf unterschiedliche Clusterknoten abgelegt. Zusätzlich wird durch den NameNode sichergestellt, dass jeder Block mehrfach auf unterschiedliche Knoten im Cluster repliziert wird.

In der HDFS-Standardkonfiguration repliziert der NameNode jeden Block dreifach. Kommt es nun zum Ausfall eines Knotens im Cluster, gehen nur die auf dem ausgefallenen Rechners sich befindlichen Blöcke verloren und keine ganze Datei, da die verloren gegangenen Blöcke durch ihre Kopien auf anderen Knoten ersetzbar sind und so der NameNode die ganze Datei bereitstellen kann.

Der NameNode, der alle im HDFS existierenden Verzeichnisse und Dateien kennt, muss daher zu jeder Datei wissen, in welche Blöcke sie aufgeteilt wurde, auf welchem Cluster-Knoten sie liegen und welche Kopien der einzelnen Blöcke es gibt. Zu deren Verwaltung laufen auf dem Cluster-Knoten die sogenannten DataNodes. Im Gegensatz zum NameNode, von denen es nur einen pro HDFS-Instanz gibt, verwalten die DataNodes nur die einzelnen Blöcke, die auf ihrem Knoten abgelegt wurden. Über die Information welche Blöcke zu welcher Datei gehören, verfügt nur der NameNode.

Soll ein Client beispielsweise eine Datei aus dem HDFS lesen, baut er zuerst eine Verbindung zum NameNode auf. Der ermittelt, welche Blöcke zu der angeforderten Datei gehören und auf welchen DataNodes sie liegen. Für den Datentransfer sind nun der anfragende Client und die DataNodes zuständig, da der Datentransfer nicht zur Aufgabe des NameNode gehört. Die Aufteilung stellt sicher, dass der NameNode nicht überlastet wird und sich Transfers dezentral durchführen lassen, während der NameNode für andere Aufgaben zur Verfügung steht.

Ähnlich verhält sich das Prozedere beim Übertragen einer Datei ins HDFS. Der Client teilt dem NameNode mit, dass er eine Datei anlegen soll, worauf der NameNode für die neue Datei einen Eintrag in seinem Namespace erstellt und der Client mit der Datenübertragung anfangen kann. Dafür baut der Client lokal aus den zu übertragenden Daten die Byteblöcke für das HDFS auf. Sobald ein Block seine vorgegebene Größe erreicht hat, fordert der Client beim NameNode Speicherplatz für ihn an. Der NameNode bestimmt dann eine Liste von DataNodes, auf denen der Block mit seinen Kopien abzulegen ist.

Für die eigentliche Übertragung kommt anschließend ein als Pipelining bezeichneter Prozess zum Einsatz. Der Client überträgt den Block zum ersten DataNode aus der erhaltenen Liste. Sobald er den Block erfolgreich auf seinem lokalen System abgelegt hat, transferiert er ihn weiter zum nächsten DataNode aus der Liste. Der Replikationsvorgang wiederholt sich, bis die zu erreichende Anzahl von Blockkopien erreicht ist. Allerdings ist er für den Client transparent, der in der Zwischenzeit den nächsten Block für das HDFS aufbauen und übertragen kann. Der Transfer der gesamten Datei findet erst ein Ende, wenn der NameNode sichergestellt hat, dass jeder Block mit einer Mindestzahl Kopien im HDFS abgelegt wurde.

Bei Größen von bis zu mehreren hundert Knoten in einem HDFS-Cluster stellen Hardwareausfälle ein tägliches Vorkommnis dar, das es jedoch zu dokumentieren gilt. Deshalb schickt jeder DataNode in konfigurierbaren Abständen einen Heartbeat an den zentralen NameNode im HDFS, der sich für jeden DataNode den Zeitpunkt des letzten Heartbeats merkt. Bleibt er über einen längeren Zeitpunkt aus, erklärt der NameNode ihn für tot. Damit man durch den vermeintlichen Ausfall nicht die Mindestanzahl von Kopien pro Block unterschreitet und so die Wahrscheinlichkeit eines Datenverlusts ansteigt, weist der NameNode die DataNodes mit den verbleibenden Kopien an, sie auf andere DataNodes zu replizieren.

Etwas anders sieht es jedoch beim Ausfall des NameNode aus. Ihn gibt es pro HDFS-Instanz nur einmal, weshalb er einen "Single Point of Failure" im HDFS darstellt. Der NameNode speichert seine Informationen über die Metadaten des HDFS im Dateisystem seines Hosts ab. Das Abbild bezeichnet man als FsImage, das beim Start des NameNode vollständig von diesem in den Arbeitsspeicher geladen wird. Dadurch kann der NameNode Anfragen schnell beantworten. Um eine Beschädigung des FsImage durch eventuelle Schreiboperationen bei Änderungen an den Metadaten des HDFS zu vermeiden, werden Änderungen nur im Speicher gehalten und nicht augenblicklich persistiert. Die Aufgabe übernimmt der SecondaryNameNode, der jedem NameNode beigestellt ist. Er protokolliert alle Änderungen im EditLog. Startet ein NameNode, liest er sowohl das letzte FsImage als auch das EditLog ein und führt alle Änderungen aus dem EditLog am FsImage nach. Das aktualisierte FsImage speichert der NameNode, der erst anschließend seine Arbeit aufnimmt.

Neben dem bereitgestellten Befehlszeilen-Client, lässt sich über die Java-API aus jeder Java-Anwendung mit dem HDFS arbeiten.