Reaktive Anwendungen mit dem Reactor-Framework

Werkzeuge  –  2 Kommentare

Asynchrone, eventgetriebene Architekturen haben in den letzten Jahren stark an Bedeutung gewonnen. Ein Treiber für diese Entwicklung ist das Web mit ständig steigenden Anforderungen an Performanz und Skalierbarkeit. Eine neue Antwort auf diese Herausforderungen liefert das Reactor-Framework.

Verschiedene neue Frameworks erlauben die Implementierung eventgetriebener Architekturen für hochperformante Datenverarbeitung durch asynchrone Event-Verarbeitung sowie Non-Blocking IO. Die grundlegende Philosophie dieses Ansatzes beschreibt das im letzten Jahr veröffentlichte und mittlerweile in Version 2.0 aktualisierte Reactive Manifesto. Bekannte Vertreter sind Node.js und Rubys EventMachine, auf der JVM sind vor allem Vert.x, Akka und das Basis-Framework Netty zu nennen.

Wesentliches Grundkonzept ist das Reactor-Pattern, das namensgebend für das noch junge Framework Reactor von SpringSource war. Es ist:

  • ein Basis-Framework für asynchrone Anwendungen. Es wird als Bibliothek innerhalb einer Anwendung benutzt, bringt also im Gegensatz zu anderen reaktiven Frameworks wie Vert.x oder Node.js keine Laufzeitumgebung mit.
  • "event-driven" und "reactive".
  • auf hohe Performanz ausgelegt. Laut den Entwicklern soll die Verarbeitung von bis zu 20 Millionen Events pro Sekunde auf einem Quad-Core-Rechner möglich sein.

Als Open-Source-Projekt steht Reactor unter der Apache Software License 2.0. Die Version 1.0 wurde im November 2013 veröffentlicht. Nachstehende Beispiele benutzen die Version 1.1.0 aus dem Mai 2014.

Reactor als asynchroner Event-Bus

Den Kern von Reactor bildet ein asynchroner Event-Bus, der für das Routing und das Zustellen von Events zuständig ist. An ihm können sich Consumer registrieren und Bus-Nachrichten empfangen, die in Form von Events eintreffen. Ein erstes Hello-World-Beispiel zeigt die wesentlichen Grundelemente: Selektoren, Consumer und Events.

 1: public class HelloReactorWorld {
2: public static void main(String[] args) throws InterruptedException {
3: Reactor reactor = Reactors.reactor(new Environment());
4: CountDownLatch latch = new CountDownLatch(1);
5:
6: // Consumer wird registriert
7: reactor.on(
8: $("Greeting"),
9: (Event<String> ev) -> {
10: LoggingHelper.logInfoString(ev.getData());
11: latch.countDown();
12: });
13:
14: // Event wird generiert
15: LoggingHelper.logInfoString("Send Greeting...");
16: reactor.notify("Greeting", Event.wrap("Hello World"));
17:
18: latch.await();
19: }
20: }

In Zeile 3 wird zunächst eine neue Reactor-Instanz mit einer Standardumgebung erzeugt. Diese Instanz muss in der Anwendung allen Event-Produzenten und -Konsumenten zur Verfügung stehen. Es lassen sich zudem auch mehrere, voneinander unabhängige Reactor-Instanzen in einer Anwendung starten.

Mit reactor.on wird ein Consumer für Events bei der Reactor-Instanz registriert (Zeile 7-12, hier mit Java-8-Lambda-Ausdruck). Auf welche Events er reagiert, legt dessen Selektor fest. Reactor bringt fertige Selektoren mit, die unter anderem auf Klassentyp, reguläre Ausdrücke oder URL-Patterns vergleichen. Das Beispiel verwendet einen Objekt-Selektor (Zeile 7): $ ist dabei eine statische Methode als Kurzform für den Aufruf von Selectors.object. Dieser Selektor passt auf den Vergleich über equals für das Topic "Greeting", das somit als Bus-Adresse fungiert.

In Zeile 16 erzeugt reactor.notify ein Event an den Reactor: Im Beispiel wird als Event-Nutzdatum "Hello World" an die Adresse "Greeting" übergeben. Events können beliebige Objekte transportieren. Es empfiehlt sich, nur "immutable" Datentypen zu verwenden. Reactor ist ein asynchrones Framework. Nach Aufruf von notify beendet sich eigentlich das Programm, aber Event-Zustellung und -Verarbeitung sind zu diesem Zeitpunkt eventuell noch nicht erfolgt. Daher wird ein CountDownLatch eingesetzt.

Dispatcher in Reactor

Für Event-Zustellung und die Ausführung des Consumer ist in Reactor ein Dispatcher zuständig: Verschiedene Implementierungen stehen zur Auswahl, die für unterschiedliche Zwecke optimiert sind. Dispatcher stellen der Event-Verarbeitung einen Thread zur Verfügung. Wie viele Threads es gibt, wie diese verwaltet werden und wie genau die Verteilung auf Threads passiert, hängt von der gewählten Implementierung ab.

Standardmäßig nutzt Reactor einen Ringbuffer-Dispatcher. Damit werden Events in einem Ringbuffer gespeichert und mit nur einem einzelnen Thread abgearbeitet, und zwar anhand der Konzepte und Implementierungen des LMAX Disruptor (für dessen Performance siehe hier). Der Ringbuffer-Dispatcher ermöglicht einen hohen Durchsatz für Consumer, die nicht blockieren und nur kurz laufen. Das Ergebnis der Log-Ausgabe mit diesem Dispatcher ist folglich:

2014-07-31T09:58:20.090Z: [main] Send Greeting...
2014-07-31T09:58:20.216Z: [ringBuffer-9] Hello World

Da Consumer ihren Dispatcher-Thread bis zur vollständigen Abarbeitung nichtunterbrechbar belegen, nimmt man für blockierende und lang laufende Consumer besser eine Dispatcher-Variante, die Events mit mehr als einem Thread verarbeitet. Eine solche Variante ist der Thread-Pool-Dispatcher. Er lässt sich beim Erzeugen der Reactor-Umgebung mitgeben:

...
3: Reactor reactor = Reactors.reactor(new Environment(),
Environment.THREAD_POOL);
...
14: // Events werden generiert
15: LoggingHelper.logInfoString("Send Greeting...");
16: reactor.notify("Greeting", Event.wrap("Hello World"));
17: reactor.notify("Greeting", Event.wrap("Goodbye World"));
...

Schickt man nun zwei Events (Zeilen 16-17), werden diese durch zwei verschiedene Threads konsumiert. Die Reihenfolge ihrer Verarbeitung ist nicht festgelegt:

2014-08-01T07:14:44.983Z: [main] Send Greeting...
2014-08-01T07:14:45.124Z: [threadPoolExecutor-10] Hello World
2014-08-01T07:14:45.124Z: [threadPoolExecutor-11] Goodbye World