Verarbeiten großer verteilter Datenmengen mit Hadoop, ein Beispiel

Beispielanwendung

Anzeige

Der Job LogFileSearch beruht auf einem realen System, das die einzelnen Schritte der fachlichen Prozesse auf unterschiedlichen Server ausführt und bei dem sich dementsprechend die Logmeldungen in unterschiedlichen Logdateien verteilt befinden. Eine Anforderung war, dass sich bei Auftreten eines technischen Fehlers schnell nachvollziehen lässt, wo er seinen Ursprung hat. Trotz mehreren hundert Gigabyte an Logdateien pro Tag muss deshalb die Suche nach einzelnen Prozessen schnell durchgeführt werden können. Das einzige Zuordnungsmerkmal der Meldungen zueinander stellt eine gemeinsame Prozess-ID dar, die alle Server teilen. Zur Umsetzung der Aufgabe müssen daher alle zu einem Prozess gehörenden Meldungen von den Servern eingesammelt und in der Reihenfolge ihrer Erzeugung sortiert ausgegeben werden.

Der folgende Code zeigt den vereinfachten Aufbau einer einzelnen Logmeldung.

BP_0000000000/12 [main] INFO  node-0 - Prozesschritt 12/68

Am Anfang der Meldung stehen die Prozess-ID und die Schrittnummer. Mit den beiden Angaben kann eine MapReduce-Anwendung zusammengehörende Logmeldungen finden und in der richtigen Reihenfolge ausgeben.

Für Textdateien wie die zu durchsuchenden Logfiles lässt sich als Eingabeformat die Klasse TextInputFormat verwenden. Sie übergibt für jede gelesene Zeile die Zeilennummer als Schlüssel und die Zeile als Wert an die Mapper-Implementierung aus dem Listing 3.

public class LFSMapperSearch
extends Mapper<LongWritable, Text, LFSBPKey, Text> {
private LFSBPKey bpKey = new LFSBPKey();
private String searchID;
    protected void setup(Context context) throws IOException, 
InterruptedException {
super.setup(context);
       setSearchID(context.getConfiguration()
.get(LogFileSearch.LOG_FILE_SEARCH_BPID));
}
    protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
String logLine = value.toString();
        int firstBlank = logLine.indexOf(" ");
int firstSlash = logLine.indexOf("/");
        String bpidWithoutPrefix = logLine.substring(3, firstSlash);
String bpidWithPrefix = logLine.substring(0, firstSlash);
String step = logLine.substring(firstSlash + 1, firstBlank);
        if (getSearchID().equals(bpidWithPrefix)) {
bpKey.set(Integer.valueOf(bpidWithoutPrefix),
Integer.valueOf(step));
context.write(bpKey, value);
}
}
    public void setSearchID(String id) {
searchID = id;
}
    public String getSearchID() {
return searchID;
}
}

Hadoop verwendet für die beiden zu übergebenden Werte die Datentypen LongWritable und Text, die unter anderem die Serialisierungsfunktionen bereitstellen, welche man für den Transfer zwischen den Clusterknoten benötigt.

Der Mapper ermittelt die Prozess-ID. Stimmt sie mit der gesuchten und über die Konfiguration übergebenen ID überein, reicht man die gefundene Zeile zusammen mit einem selbst erzeugten Schlüssel an das Framework zur weiteren Verarbeitung weiter. Alle anderen Zeilen werden verworfen.

Wie das Schlüssel-Wert-Paar, das bei der Eingabe verwendet wurde, müssen auch die vom Mapper erzeugten Intermediate-Paare Hadoop-eigene Datentypen benutzen. Reichen die über die API bereitgestellten Klassen nicht aus, lassen sich eigene Datentypen implementieren. Der Mapper nutzt das, um für seine Ausgabe einen eigenen Schlüssel zu erzeugen, der aus Prozess-ID und Schrittnummer besteht und den Listing 4 zeigt. Damit hat der Entwickler die Möglichkeit die spätere Sortierung der Werte durch Hadoop zu beeinflussen.

public class LFSBPKey
implements WritableComparable<LFSBPKey> {
    private static final int HASH_MULTIPLIER = 59;
    static {
WritableComparator.define(LFSBPKey.class, new Comparator());
}
    private int processID = 0;
private int step = 0;
    public void set(int bpid, int bpstep) {
processID = bpid;
step = bpstep;
}
    public int getProcessID() {
return processID;
}
    public int getStep() {
return step;
}
    public void readFields(DataInput in) throws IOException {
processID = in.readInt() + Integer.MIN_VALUE;
step = in.readInt() + Integer.MIN_VALUE;
}
    public void write(DataOutput out) throws IOException {
out.writeInt(processID - Integer.MIN_VALUE);
out.writeInt(step - Integer.MIN_VALUE);
}
    public int hashCode() {
return processID * HASH_MULTIPLIER + step;
}
    public boolean equals(Object right) {
if (right instanceof LFSBPKey) {
LFSBPKey r = (LFSBPKey) right;
return r.processID == processID && r.step == step;
} else {
return false;
}
}
    public static class Comparator extends WritableComparator {
public Comparator() {
super(LFSBPKey.class);
}
        public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
return compareBytes(b1, s1, l1, b2, s2, l2);
}
}
    public int compareTo(LFSBPKey o) {
if (processID != o.processID) {
return processID < o.processID ? -1 : 1;
} else if (step != o.step) {
return step < o.step ? -1 : 1;
} else {
return 0;
}
}

Vor der Übergabe der Ergebnisse der Map-Phase an die Reducer-Instanzen partitioniert das Framework die erzeugten Paare mit der erwähnten Partitioner-Implementierung und sortiert die erzeugten Partitionen anschließend. Daher muss jede eigene Implementierung eines Datentyps das von java.lang.Comparable ableitende und für die Sortierung benötigte Interface WriteComparable einbinden.

Die Implementierung der Map-Phase stellt sicher, dass das Framework nur die gesuchten Logmeldungen an die Reduce-Phase übergibt. Es überträgt von den Mappern gefundene Meldungen als vorsortierte Partitionen an die Reducer. Hier vereinigt es diese mit Mergesort. Sie werden nach Schlüsselwerten sortiert, damit sie so an die Reducer übergeben werden können. Da das Framework wesentliche Aufgaben wie die Sortierung übernimmt, bleibt für die in Listing 5 gezeigte Reducer-Klasse nur noch die Ausgabe der gefundenen Logmeldungen. Den Key verwirft der Entwickler, da er ihn nur zur Sortierung benötigt hat und der Schlüssel in der Ergebnisdatei nicht vorhanden sein soll.

public class LFSReducer
extends Reducer<LFSBPKey, Text, Text, Text> {
    protected void reduce(LFSBPKey key, Iterable<Text> 
values, Context context)
throws IOException, InterruptedException
{
for(Text value: values) {
context.write(null, value);
}
}
}

Listing 6 zeigt abschließend die Klasse LogFileSearch zur Konfiguration des Jobs mit den Klassen Configuration und Job. Die Klasse LogFileSearch muss man als normales Java-Programm erstellen, damit das Framework sie ausführen kann. main() wertet zuerst die Befehlszeilenargumente aus, wobei der Entwickler drei Argumente für den Job benötigt: den HDFS-Pfad zum Verzeichnis mit der zu durchsuchenden Logdatei, den Ausgabepfad im HDFS und die zu suchende Prozess-ID. Letztere übergibt man an die Configuration-Instanz, womit sie allen Mappern über das Framework zur Verfügung steht. Anschließend lässt sich über den Job dem Framework mitteilen, welche Implementierung für welche Aufgabe zu nutzen ist. Da man alle gefundenen Logmeldungen in einer Datei zusammenfassen soll, darf Hadoop nur eine Reducer-Instanz starten. Abschließend lassen sich die Ein- und Ausgabepfade konfigurieren und der Job ausführen.

Anzeige