Spark versus Flink – Rumble in the (Big Data) Jungle

Batch Processing

Umgang beim Batch Processing

Im weiteren Verlauf werden die wichtigsten Konzepte beider Frameworks anhand eines einfachen Beispiels vorgestellt. Um zu verdeutlichen, wie Programme mit Spark beziehungsweise Flink aussehen, wird der komplette Code gezeigt. Zur besseren Vergleichbarkeit und wegen der größeren Verbreitung nutzen die Autoren die Java APIs beider Frameworks. Wer keine Berührungsängste zu Scala hat, sollte aber auf jeden Fall einen Blick auf die Scala APIs riskieren. Konzepte wie Case Classes und Pattern Matching machen den Scala-Code um einiges kürzer und eleganter als den in Java.

Das Beispiel zur Verarbeitung von Batch-Daten besteht daraus, eine CSV-Datei einzulesen, die eine Liste verkaufter Artikel enthält. In der Datei gibt es die Artikel-ID, die Anzahl verkaufter Artikel und das Verkaufsdatum. Aus den Daten soll abgeleitet werden, welcher Artikel pro Tag am häufigsten verkauft wurde. Das Beispiel ist relativ einfach gehalten, erlaubt aber einen Blick auf viele wichtige Operatoren.

In der Batch API von Spark gibt es ein Kernkonzept, das Resilient Distributed Dataset (RDD). Dabei handelt es sich um eine Datenstruktur, die sich zum einen auf einen Cluster verteilen lässt (distributed) und zum anderen Ausfallsicherheit bietet (resilient). Die Ausfallsicherheit ist so umgesetzt, dass man bereits berechnete Daten wieder aus den Quelldaten nachberechnen kann. Fällt ein Cluster-Knoten aus, übernehmen die anderen die Neuberechnung der verlorenen Daten. Mit Spark lässt sich auch ein Zwischenstand der Daten persistieren, womit dieser Datensatz für Nachberechnungen der neue Ausgangszustand wird.

In den letzten Versionen hat Spark neben dieser Abstraktion noch zwei weitere Batch APIs eingeführt, die Dataframes und die Datasets. Insbesondere Erstere bringen eine höhere Geschwindigkeit, jedoch zu Lasten der Typsicherheit. Die Dataset API ist erstmals in Spark 1.6 zu finden und soll mehr Typsicherheit als die Dataframes bringen, bei besserer Performance als bei den RDDs. Derzeit empfiehlt es sich allerdings, noch die zwei APIs aus Scala zu nutzen; bei beiden gibt es noch einige Einschränkungen in der Java API. Der Artikel verwendet die RDDs, da sie die historische Basis von Spark sind, aber auch weil andere Teile von Spark, etwa Streaming, darauf aufbauen. Es ist allerdings absehbar, dass die DataFrames und Datasets APIs die RDD API auch dort verdrängen werden.

Codebeispiel: Einlesen und Verarbeiten einer Datei

SparkConf conf = 
new SparkConf().setAppName("Spark Demo").setMaster("local[*]");
JavaSparkContext sparkContext = new JavaSparkContext(conf);

JavaRDD<String> salesFile = sparkContext.textFile(salesPath);
JavaPairRDD<Sale,Integer> salesWithAmount = salesFile
.map(line -> line.split(",")) // Splitten des CSV in einzelne Felder
.filter(array -> array.length == 3) // Rausfiltern ungültiger Zeilen
.mapToPair(parts -> { // Umwandeln der Werte in einen POJO
int id = Integer.parseInt(parts[0]), amount =
Integer.parseInt(parts[1]);
String date = parts[2];
return new Tuple2<Sale,Integer>(new Sale(id, date), amount);
});

// Addieren der einzelnen verkauften Mengen
JavaPairRDD<Sale, Integer> saleWithSum =
salesWithAmount.reduceByKey((x, y) -> x + y);

saleWithSum.rdd().toJavaRDD() // Umwandeln von JavaPairRDD nach JavaRDD
.sortBy(Tuple2::_2, false,1) // Sortieren nach der Menge
.foreach(tuple -> System.out.println(tuple._1() + ":
" + tuple._2())); // Ausgeben der Ergebnisse

Zu Beginn ist bei Spark ein SparkContext zu erstellen. Er lässt sich mit der SparkConf konfigurieren. So kann man den Namen der Applikation setzen, Speichergrößen festlegen, und vor allem den Master bestimmen. Er legt fest, wo das Programm ausgeführt wird – auf einem Cluster oder lokal in der JVM. Im Cluster-Fall gibt man hier die Adresse des Masters ein, ansonsten wie im Beispiel "local". Dahinter kann man noch die Parallelität festlegen. "*" bedeutet, dass Spark nach Belieben parallelisieren kann.

Spark erlaubt das Konsumieren von Daten aus verschiedenen Quellen: HDFS (Hadoop Distributed File System), S3, JDBC (Java Database Connectivity), aber auch aus normalen Textdateien wie im Beispiel. Entsprechend gibt es verschiedene Methoden, die für eine Datenquelle einen RDD zurückgeben, hier kommt textFile zum Einsatz. Wer einen RDD hat, kann auf ihm zwei Typen von Operationen ausführen: Transformationen (map, reduce, filter ...) und Aktionen (save, print ...). Erstere sind "lazy" und werden nur ausgeführt, wenn am Ende der Aufrufkette eine Aktion steht. So kann man die Aktion auskommentieren, und die ganze Operatorenkette wird nicht mehr durchgeführt.

Spark bietet für die RDD API keine Möglichkeit, CSV- oder JSON-Daten einfach einzulesen. Hier müssen Entwickler entweder, wie im obigen Beispiel, selbst die Daten extrahieren oder auf andere Libraries dafür zurückgreifen. Die Dataframes API bietet hingegen etwas mehr Komfort; unter anderem gibt es ein Plug-in zum direkten Einlesen von CSV-Dateien.

Da die Menge pro Artikel und Tag berechnet werden soll, fassen die Autoren diese zwei Felder in einem POJO (Plain Old Java Object) zusammen. Da sie immer zusammen behandelt werden, macht das den Code einfacher und aufgeräumter. Die verkaufte Menge kann nicht Teil des POJOs sein, da über diesen Wert summiert werden soll. Um alle Werte für einen Schlüssel (hier das Sale-Objekt) zu bekommen, benutzen Entwickler den reduceByKey-Operator. Er ist nur auf einem RDD über Tupel definiert. In der Java API gibt es dafür den Typ JavaPairRDD, den man über die mapToPair-Methode bekommt. Sie erwartet als Rückgabe aus dem Lambda ein Tuple2-Objekt. Es ist eine in Scala nativ vorliegende Klasse, die in der Spark API fest verwurzelt ist. Die mapToPair-Methode gibt es nur in der Java API, unter Scala stehen die byKey-Methoden automatisch für RDDs über ein Tuple zur Verfügung.

Auf dem JavaPairRDD lässt sich nun reduceByKey aufrufen. In einem Lambda wird übergeben, wie mit den Werten verfahren werden soll. In diesem Fall wird einfach aufsummiert. Damit gruppiert Spark nun den RDD nach gleichen Sale-Objekten und summiert die dazugehörigen Mengen. In der Verarbeitung im Cluster ist das eine potenziell kostspielige Operation. Bisher ließen sich alle Operationen (map und filter) für jeden Datensatz unabhängig, und damit parallel, voneinander durchführen. Für den reduce-Schritt werden nun aber Datensätze für die gleichen Schlüssel auf demselben Knoten benötigt. Im Cluster ist hierzu neu zu partitionieren. Das heißt, die Daten werden zwischen den Nodes hin und her kopiert. Bei großen Datenmengen und vielen Cluster-Knoten sollte man nicht ohne Not solche Operationen einsetzen.

Die Verarbeitung findet ihren Abschluss, indem die Daten erst sortiert und dann auf die Konsole ausgegeben werden. Da man auf dem JavaPairRDD nur nach dem Key sortieren kann, geschieht die Umwandlung über den Umweg des Scala RDD in den Java RDD.

Bei Flink ähnlich

Die DataSet API von Flink bietet ähnliche Funktionen wie Spark, doch beim Einlesen von Dateien verfügt die Technik über bequemere Funktionen. So lassen sich CSV-Dateien einfach einlesen und auf POJOs oder Tupel abbilden. Auch für häufig genutzte reduce-Funktionen bietet Flink eigene Methoden an, sodass der Code deutlich kürzer und verständlicher ausfällt.

final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

env.readCsvFile(salesPath)
.ignoreInvalidLines()
.types(Integer.class, Integer.class, String.class)
.groupBy(0,2) // Gruppieren nach ArtikelID und Datum
.sum(1) // Verkaufszahlen in diesen Gruppen aufsummieren
.groupBy(2) // Gruppieren nach Tag
.maxBy(1) // Pro Gruppe (also Tag) Artikel mit den meisten Sales
.print();

Bei Flink gibt es ebenfalls zwei Typen von Operatoren, sodass die Verarbeitung nur geschieht, wenn die Ergebnisse auch benötigt werden. Im Gegensatz zu Spark muss man bei Flink nicht im Code einstellen, wo und wie die Applikation laufen wird. Flink erkennt selbst, ob es lokal oder im Cluster läuft.

Auf Environment lassen sich auch Daten aus verschiedenen Quellen einlesen. Wie erwähnt gibt es hier aber eine Möglichkeit, CSV-Dateien zu verarbeiten. Da sie sich oft nur lose an den Standard halten, gibt es viele Mittel, bestimmte Parameter zu setzen, etwa den Separator oder eben, ob ungültige Zeilen gefiltert werden sollen. Zum Schluss lassen sich die Daten direkt in einem POJO umwandeln. In dem Fall werden die Felder einfach auf ein Tupel abgebildet. Hierzu ist nur anzugeben, welche Typen für die Felder zum Einsatz kommen. Als Ergebnis erhält man ein DataSet-Objekt, auf dem ähnliche Operationen möglich sind wie vorher bei Spark auf dem RDD.

Auch gibt man an, dass die ID und das Datum als Schlüssel genutzt werden sollen. In Flink erledigt man das mit groupBy und gibt beide relevanten Felder an. Danach kommt die sum-Funktion zum Einsatz, die auf einem GroupedDataSet definiert ist. Diese und einige weitere Funktionen werden als spezielle reduce-Funktionen direkt in der API angeboten, damit sich Benutzer etwas Code und Komplexität sparen können. Framework-seitige Optimierungen geraten so ebenfalls einfacher. Zum Schluss werden die Daten noch sortiert und ausgegeben.

Auch wenn die Lösung in beiden Frameworks ähnlich aufgebaut ist, bietet Flink subjektiv gesehen die schönere, ausdrucksstärkere API und mehr Komfort. Mit den neuen APIs wird Spark hier aber aufholen.