Verarbeiten großer verteilter Datenmengen mit Hadoop, ein Beispiel

Werkzeuge  –  Kommentare

Standen bei einem früheren Artikel das MapReduce-Programmiermodell sowie die Apache-Implementierung Hadoop im Zentrum, führt heise Developer nun in die Programmierung von MapReduce-Anwendungen mit dem Framework ein. Beispielhaft sei ein MapReduce-Job entwickelt, auf dessen Grundlage der Leser erste eigene Jobs umsetzen kann.

Das Hadoop-Framework übernimmt einen Großteil der für einen MapReduce-Job benötigten Arbeiten, sodass sich der Entwickler in den meisten Fällen auf die Implementierung der Map- und der Reduce-Funktion konzentrieren kann. Die Abbildung 1 stellt das Zusammenspiel zwischen dem Entwickler und dem Hadoop-Framework dar. Im ersten Schritt teilt der Entwickler dem Framework mit, wo es im Hadoop Distributed File System (HDFS) die zu verarbeitenden Daten findet und mit welchem Eingabeformat es diese einlesen soll (Punkt 1).

Zusammenspiel zwischen Entwickler und Hadoop (Abb. 1)

Zu beachten ist an diesem Punkt, dass Hadoop jeden Block einer Datei komplett in den Speicher einliest, unabhängig davon, wie viele der enthaltenen Datensätze es verarbeiten soll. Dabei sei eine Prämisse vorangestellt: Durch die wesentlich höhere Geschwindigkeit des Hauptspeichers im Vergleich zur Lesegeschwindigkeit von Festplatten ist es effizienter, einen ganzen Block in den Hauptspeicher einzulesen und nicht verwendete Daten einfach zu verwerfen, statt die benötigten Blockabschnitte auf der Festplatte zu suchen. Das ist der Grund für das Fehlen einer fseek()-ähnlichen Funktion, wodurch unter anderem das Einfügen von Daten in bestehende Dateien nicht möglich ist. Da man somit keinen Random Access (Direktzugriff) auf Daten realisieren kann, muss Hadoop alle Daten in der bereitgestellten Reihenfolge verarbeiten. Das Verfahren bezeichnet man als Streaming Access.

Hadoop stellt Entwicklern fertige Klassen zur Verfügung, die das Einlesen der Daten aus dem HDFS und ihre Aufbereitung in die für die Map-Funktion benötigten Key-Value-Paare abnehmen. Lediglich den Pfad im HDFS zu den zu verarbeitenden Dateien muss der Anwender angeben, damit das Framework die zu lesenden Dateien bestimmen kann. Sind sie festgelegt, ermittelt das Framework die zu lesenden Blöcke auf den DataNodes im Cluster und legt fest, wie viele Map-Tasks durch TaskTracker auf welchen Knoten auszuführen sind. Damit folgt Hadoop dem Lokalitätsbestreben von MapReduce, die Berechnung immer bei den Daten auszuführen. Mit dem gewählten Eingabeformat teilt das Framework die Blöcke in Splits auf und erzeugt die Schlüssel-Wert-Paare für den Mapper. Hadoop startet mit diesen Informationen und der gegebenen Mapper Implementierung die TaskTracker für die Map-Phase (Punkte 3 und 4).

Da die Ausgabe der Mapper gleichzeitig die Eingabe für die Reduce-Phase darstellt, muss Hadoop als Nächstes für jedes ausgegebene Paar entscheiden, an welchen der Reducer es das Paar schicken soll. Hadoops Standardimplementierung stellt über den Hash-Wert des Schlüssels der erzeugten Schlüssel-Wert-Paare sicher, dass alle Werte mit dem gleichen Wert an dieselbe Reducer-Instanz gehen. Ein abweichendes Verhalten lässt sich über eine eigene Implementierung des Interfaces Partitioner erreichen (Punkte 5 und 6).

Der Entwickler kann neben der Implementierung die Anzahl zulässiger Reducer-Instanzen bestimmen (Punkt 7). Da die Reducer die endgültige Ausgabe des Jobs erzeugen und im HDFS ablegen, bedeutet jede Instanz eine Ausgabedatei mit einem Teilergebnis des Jobs. Möchte man das Ergebnis nur in einer Datei ausgeben, lässt sich das am einfachsten durch die Beschränkung auf einen Reducer erreichen. Andere Wege sind durch zusätzliche Verarbeitungsschritte ebenfalls denkbar. Analog zum Eingabeformat muss der Hadoop-Entwickler Ausgabeformat und -ort im HDFS angeben.

Entwicklung von MapReduce-Anwendungen

Seit Hadoop 0.20.1 gibt es eine neue API für MapReduce-Anwendungen, sodass zuerst zu entscheiden ist, ob man mit der alten oder der neuen API programmieren möchte. Bei Neuentwicklungen sollte man der neuen den Vorrang geben, wenn sich auch noch die meisten Beispiele im Internet auf die alte API beziehen. Die Beispiele in diesem Artikel verwenden nur die neue API.

Die Hauptaufgabe für die Entwicklung einer MapReduce-Anwendung besteht in der Umsetzung der Mapper- und der Reducer-Funktion. Hierfür stellt Hadoop in der aktuellen Version die generischen Klassen Mapper und Reducer in einer Default-Implementierung als Identitätsfunktion zur Verfügung, wie sie Listing 1 und 2 auszugsweise zeigen. Eigene Implementierungen sind von den Klassen abzuleiten.

Listing 1
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
protected void map(KEYIN key, VALUEIN value,
Context context)
throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
Listing 2
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
protected void reduce(KEYIN key, Iterable<VALUEIN> values,
Context context)
throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
}

Nach der Umsetzung von Mapper und Reducer muss die Konfiguration des MapReduce-Jobs erfolgen, wofür das Framework zwei zentrale Klassen bereitstellt: Job und Configuration. Über Letztere kann der Entwickler jobspezifische Parameter an die Mapper- und Reducer-Implementierung übergeben oder Defaultwerte des Hadoop-Frameworks verändern. Mit der Klasse Job teilt der Entwickler dem Framework mit, welche Implementierung es für die einzelnen Teilaufgaben des Jobs verwenden werden soll.

Listing 6 zeigt die Konfiguration des Beispiel-Jobs. Zu beachten ist, dass man dem Job immer Klassen und keine konkreten Objekte übergibt, da die konkreten Instanzen erst zur Ausführungszeit auf den unterschiedlichen Knoten instanziiert werden.


Die Beispielanwendung

Der Job LogFileSearch beruht auf einem realen System, das die einzelnen Schritte der fachlichen Prozesse auf unterschiedlichen Server ausführt und bei dem sich dementsprechend die Logmeldungen in unterschiedlichen Logdateien verteilt befinden. Eine Anforderung war, dass sich bei Auftreten eines technischen Fehlers schnell nachvollziehen lässt, wo er seinen Ursprung hat. Trotz mehreren hundert Gigabyte an Logdateien pro Tag muss deshalb die Suche nach einzelnen Prozessen schnell durchgeführt werden können. Das einzige Zuordnungsmerkmal der Meldungen zueinander stellt eine gemeinsame Prozess-ID dar, die alle Server teilen. Zur Umsetzung der Aufgabe müssen daher alle zu einem Prozess gehörenden Meldungen von den Servern eingesammelt und in der Reihenfolge ihrer Erzeugung sortiert ausgegeben werden.

Der folgende Code zeigt den vereinfachten Aufbau einer einzelnen Logmeldung.

BP_0000000000/12 [main] INFO  node-0 - Prozesschritt 12/68

Am Anfang der Meldung stehen die Prozess-ID und die Schrittnummer. Mit den beiden Angaben kann eine MapReduce-Anwendung zusammengehörende Logmeldungen finden und in der richtigen Reihenfolge ausgeben.

Für Textdateien wie die zu durchsuchenden Logfiles lässt sich als Eingabeformat die Klasse TextInputFormat verwenden. Sie übergibt für jede gelesene Zeile die Zeilennummer als Schlüssel und die Zeile als Wert an die Mapper-Implementierung aus dem Listing 3.

Listing 3
public class LFSMapperSearch
extends Mapper<LongWritable, Text, LFSBPKey, Text> {
private LFSBPKey bpKey = new LFSBPKey();
private String searchID;
    protected void setup(Context context) throws IOException, 
InterruptedException {
super.setup(context);
       setSearchID(context.getConfiguration()
.get(LogFileSearch.LOG_FILE_SEARCH_BPID));
}
    protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
String logLine = value.toString();
        int firstBlank = logLine.indexOf(" ");
int firstSlash = logLine.indexOf("/");
        String bpidWithoutPrefix = logLine.substring(3, firstSlash);
String bpidWithPrefix = logLine.substring(0, firstSlash);
String step = logLine.substring(firstSlash + 1, firstBlank);
        if (getSearchID().equals(bpidWithPrefix)) {
bpKey.set(Integer.valueOf(bpidWithoutPrefix),
Integer.valueOf(step));
context.write(bpKey, value);
}
}
    public void setSearchID(String id) {
searchID = id;
}
    public String getSearchID() {
return searchID;
}
}

Hadoop verwendet für die beiden zu übergebenden Werte die Datentypen LongWritable und Text, die unter anderem die Serialisierungsfunktionen bereitstellen, welche man für den Transfer zwischen den Clusterknoten benötigt.

Der Mapper ermittelt die Prozess-ID. Stimmt sie mit der gesuchten und über die Konfiguration übergebenen ID überein, reicht man die gefundene Zeile zusammen mit einem selbst erzeugten Schlüssel an das Framework zur weiteren Verarbeitung weiter. Alle anderen Zeilen werden verworfen.

Wie das Schlüssel-Wert-Paar, das bei der Eingabe verwendet wurde, müssen auch die vom Mapper erzeugten Intermediate-Paare Hadoop-eigene Datentypen benutzen. Reichen die über die API bereitgestellten Klassen nicht aus, lassen sich eigene Datentypen implementieren. Der Mapper nutzt das, um für seine Ausgabe einen eigenen Schlüssel zu erzeugen, der aus Prozess-ID und Schrittnummer besteht und den Listing 4 zeigt. Damit hat der Entwickler die Möglichkeit die spätere Sortierung der Werte durch Hadoop zu beeinflussen.

Listing 4
public class LFSBPKey
implements WritableComparable<LFSBPKey> {
    private static final int HASH_MULTIPLIER = 59;
    static {
WritableComparator.define(LFSBPKey.class, new Comparator());
}
    private int processID = 0;
private int step = 0;
    public void set(int bpid, int bpstep) {
processID = bpid;
step = bpstep;
}
    public int getProcessID() {
return processID;
}
    public int getStep() {
return step;
}
    public void readFields(DataInput in) throws IOException {
processID = in.readInt() + Integer.MIN_VALUE;
step = in.readInt() + Integer.MIN_VALUE;
}
    public void write(DataOutput out) throws IOException {
out.writeInt(processID - Integer.MIN_VALUE);
out.writeInt(step - Integer.MIN_VALUE);
}
    public int hashCode() {
return processID * HASH_MULTIPLIER + step;
}
    public boolean equals(Object right) {
if (right instanceof LFSBPKey) {
LFSBPKey r = (LFSBPKey) right;
return r.processID == processID && r.step == step;
} else {
return false;
}
}
    public static class Comparator extends WritableComparator {
public Comparator() {
super(LFSBPKey.class);
}
        public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
return compareBytes(b1, s1, l1, b2, s2, l2);
}
}
    public int compareTo(LFSBPKey o) {
if (processID != o.processID) {
return processID < o.processID ? -1 : 1;
} else if (step != o.step) {
return step < o.step ? -1 : 1;
} else {
return 0;
}
}

Vor der Übergabe der Ergebnisse der Map-Phase an die Reducer-Instanzen partitioniert das Framework die erzeugten Paare mit der erwähnten Partitioner-Implementierung und sortiert die erzeugten Partitionen anschließend. Daher muss jede eigene Implementierung eines Datentyps das von java.lang.Comparable ableitende und für die Sortierung benötigte Interface WriteComparable einbinden.

Die Implementierung der Map-Phase stellt sicher, dass das Framework nur die gesuchten Logmeldungen an die Reduce-Phase übergibt. Es überträgt von den Mappern gefundene Meldungen als vorsortierte Partitionen an die Reducer. Hier vereinigt es diese mit Mergesort. Sie werden nach Schlüsselwerten sortiert, damit sie so an die Reducer übergeben werden können. Da das Framework wesentliche Aufgaben wie die Sortierung übernimmt, bleibt für die in Listing 5 gezeigte Reducer-Klasse nur noch die Ausgabe der gefundenen Logmeldungen. Den Key verwirft der Entwickler, da er ihn nur zur Sortierung benötigt hat und der Schlüssel in der Ergebnisdatei nicht vorhanden sein soll.

Listing 5
public class LFSReducer
extends Reducer<LFSBPKey, Text, Text, Text> {
    protected void reduce(LFSBPKey key, Iterable<Text> 
values, Context context)
throws IOException, InterruptedException
{
for(Text value: values) {
context.write(null, value);
}
}
}

Listing 6 zeigt abschließend die Klasse LogFileSearch zur Konfiguration des Jobs mit den Klassen Configuration und Job. Die Klasse LogFileSearch muss man als normales Java-Programm erstellen, damit das Framework sie ausführen kann. main() wertet zuerst die Befehlszeilenargumente aus, wobei der Entwickler drei Argumente für den Job benötigt: den HDFS-Pfad zum Verzeichnis mit der zu durchsuchenden Logdatei, den Ausgabepfad im HDFS und die zu suchende Prozess-ID. Letztere übergibt man an die Configuration-Instanz, womit sie allen Mappern über das Framework zur Verfügung steht. Anschließend lässt sich über den Job dem Framework mitteilen, welche Implementierung für welche Aufgabe zu nutzen ist. Da man alle gefundenen Logmeldungen in einer Datei zusammenfassen soll, darf Hadoop nur eine Reducer-Instanz starten. Abschließend lassen sich die Ein- und Ausgabepfade konfigurieren und der Job ausführen.


Listing 6
public class LogFileSearch {
public static final String LOG_FILE_SEARCH_BPID
= "logfilesearch.bpid";
    public static void main(String[] args)
throws IOException, ClassNotFoundException,
InterruptedException
{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,
args).getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println(LogFileSearch.class.getSimpleName()
+ ": <in> <out> <ID>");
System.exit(-1);
}
        /* Setting the BPID as parameter to be passed to all instances. */
conf.set(LOG_FILE_SEARCH_BPID, otherArgs[2]);
        Job job = new Job(conf, LogFileSearch.class.getSimpleName());
        job.setJarByClass(LogFileSearch.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(LFSMapperSearch.class);
job.setOutputKeyClass(LFSBPKey.class);
job.setOutputValueClass(Text.class);
job.setReducerClass(LFSReducer.class);
job.setNumReduceTasks(1);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileInputFormat.setInputPathFilter(job, LFSPathFilter.class);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

Zur Ausführung des Jobs sind alle Klassen in einer Jar zu binden. Sie lassen sich über das Shell-Skript bin/hadoop mit dem folgenden Aufruf ausführen:

bin/hadoop -jar/pfad/zum/jar JobKlasse
zusätzlicheParameter

Hadoop-Jobs sind de facto Java-Anwendungen und lassen sich mit jeder bekannten IDE oder auch einem einfachen Texteditor erstellen. Einen integrierten Ansatz verfolgt Karmasphere Studio for Hadoop, das auf NetBeans-Basis eine Entwicklungsumgebung für Hadoop-Jobs und einen JobManager für ihre Ausführung im Cluster bietet.

Fazit

Erste MapReduce-Jobs sind leicht zu realisieren und die programmiertechnischen Hürden gering, da für die ersten Schritte die Standardklassen der Hadoop-API ausreichend sind. Darüber hinaus gehende Anforderungen wie eigene Eingabe- und Ausgabeformate können Entwickler über vorgegebene Interfaces implementieren. Ebenso haben sie die Möglichkeit, das Verhalten von Hadoop über Eigenimplementierungen und deren Angabe in den Konfigurationsdateien an eigene Bedürfnisse anzupassen.

Den positiven Punkten gegenüber steht eine noch unberechenbare API. Die Änderungen in der Version 0.20.1 sind nicht vollständig kompatibel zu den Vorgängerversionen. Auch für die Zukunft dürften Änderungen zu erwarten sein, da Hadoop aktiv weiterentwickelt wird. Die sich daraus ergebenden Fragen sind Diskussionsgegenstand auf den Mailinglisten des Projekts, die zu verfolgen empfehlenswert ist.

Doch sollten die Punkte einem Einsatz nicht im Wege stehen, da Hadoop Aufgaben bearbeitet, für die es bis jetzt nur wenige Alternativen gibt, die hinsichtlich Nutzergemeinde und Verbreitung vergleichbar fortgeschritten sind.

Oliver Fischer
arbeitet in Berlin und ist über www.swe-blog.net zu erreichen. Für wertvolle Kritik und Ergänzungen zum Artikel bedankt er sich bei Isabel Drost.