Reactive Programming – vom Hype zum Praxiseinsatz

Architektur/Methoden  –  9 Kommentare

Das Thema "Reactive" ist en vogue. Doch was genau versteht man hinter diesem Programmierstil? Und wie entwickelt man etwas, sodass es als "reactive" gelten darf?

Mitte Juli 2013 wurde das "Reactive Manifesto" veröffentlicht. Mittlerweile haben es eüber 3000 Entwickler unterzeichnet – Tendenz stark steigend. Zunächst einmal soll dem Manifest zufolge eine Anwendung jederzeit auf Stimuli reaktionsfähig sein, damit man sie "reactive" nennen kann. Dazu gehört, dass die Anwendung interaktiv, fehlertolerant und skalierbar sein muss. Insbesondere bedeutet es, dass die Anwendung ereignisgesteuert ("event-driven") sein soll. Das ist nicht weiter überraschend. Jede Bedienoberfläche ist fast zwangsläufig ereignisgesteuert, wenn sie nicht komplett auf Polling setzen will. Die Frage ist allerdings, wie viele Bedienoberflächen auf ein Benutzerereignis tatsächlich jederzeit ohne eine vom Benutzer wahrnehmbare Verzögerung reagieren können. Allzu oft findet man hier leider Gegenbeispiele.

Denn zur echten ereignisgesteuerten Anwendung gehört neben dem einfachen Bearbeiten von Ereignissen eben auch, dass dies asynchron geschieht und die Anwendung niemals blockieren darf. Das sind zwei Aspekte, die in der Praxis schwierig umzusetzen sind. Wie kommt nun ein Entwickler schrittweise von einer synchronen, blockierenden API zu einer asynchronen, nicht blockierenden?

Schritt für Schritt asynchron

Als Beispiel kann eine einfache API zum Abfragen von Filmen dienen. Solange sie nur lokal genutzt wird, genügt es sicherlich, sie synchron zu formulieren (im Beispiel in Scala):

trait User
trait Video
trait Rating

trait VideoService {
def getVideos(user: User): List[Video]
def getRating(video: Video): Rating
})

Wenn die API für Abfragen über das Internet verwendet werden soll, kann die einzelne Abfrage etwas dauern. In dieser Zeit darf die Anwendung nicht blockieren. Ein erster naiver Ansatz hierfür ist die Verwendung von Futures:

trait User
trait Video
trait Rating

trait VideoService {
def getVideos(user: User): Future[List[Video]]
def getRating(video: Video): Future[Rating]
}

Dabei muss man sich für ein geeignetes Future entscheiden. An Futures gibt es leider inzwischen viele zur Auswahl. Wer von Java kommt, ist vielleicht das Future aus der Java-Concurrency gewohnt. Zu diesem heißt es oft: "Future.get() is the most important method." Diese Aussage leitet leider in die Irre: Wenn man das Future zusammen mit seiner get()-Methode verwendet, ist nicht viel gewonnen, denn diese Methode blockiert wieder den Thread. Und das ist genau das, was zu vermeiden ist ...

Ab in die Callback-Hölle!

Mit beispielsweise der Guava-Bibliothek ist man besser aufgestellt: Die darin enthaltene Futures-Klasse liefert Callbacks, die aufgerufen werden, wenn der asynchrone Aufruf ein Ergebnis hat. Das ist angenehm, weil nirgendwo zu blockieren ist, wenn die Ergebnisbehandlung in den Callback verlegt wurde. Es wird allerdings problematisch, wenn der Entwickler asynchrone Aufrufe kombinieren und ineinander schachteln will, denn verschachtelte Callbacks werden schnell unübersichtlich und unverständlich. Das passiert aber recht schnell, wenn man etwa zunächst Übersichtsinformationen holt, um dann jeweils Detailinformationen nachzuholen, wie im folgenden Beispiel beim Holen von Video-Bewertungen zu sehen:

import java.util.concurrent._
import Executors._
import com.google.common.util.concurrent._

case class User(name: String, videos: List[Video])
case class Video(id: String, rating: Rating)
case class Rating(rating: Int)

trait VideoService {
def getVideos(user: User): Future[List[Video]]
def getRating(video: Video): Future[Rating]
}

object Videos extends VideoService {
def getVideos(user: User) = async(user.videos)
def getRating(video: Video) = async(video.rating)

private val executor = newSingleThreadExecutor

private def async[T](op: => T) = {
val task = ListenableFutureTask.create(
new Callable[T] { def call: T = op })
executor submit task
task
}

def shutdown = executor.shutdown
}

object Main extends App {
val videos = Videos.getVideos(
User("joe", List(Video("V1", Rating(9)),
Video("V2", Rating(3)))))

Futures addCallback (videos,
new FutureCallback[List[Video]] {

def onSuccess(videos: List[Video]): Unit = for {
video <- videos
rating = Videos.getRating(video)
} {
Futures addCallback (rating,
new FutureCallback[Rating] {

def onSuccess(rating: Rating): Unit =
println(s"Rating: $rating")

def onFailure(e: Throwable): Unit =
println("getRating: Failure!")
})
}

def onFailure(e: Throwable): Unit =
println("getVideos: Failure!")
})

Videos.shutdown
}

Es liegt in der Natur der Sache, dass asynchrone APIs viele asynchrone Aufrufe miteinander kombinieren wollen. Die sogenannte "Callback-Hölle" scheint vorprogrammiert zu sein, oder mit den Worten von Evan Czaplicki, dem Erfinder der (Reactive-)Programmiersprache Elm: "Callbacks are the modern 'goto'." Man braucht also etwas Besseres als Callbacks. Dabei ist die wesentliche Anforderung, dass es sich beliebig kombinieren und schachteln lassen muss.

Implementierungen

Plötzlich: Monaden

Hier kommen die sogenannten Monaden ins Spiel, die in der funktionalen Programmierung verwendet werden, um aus kleinen Bauteilen bequem größere Module zu komponieren. Hinter Monaden steckt trotz des abstrakten Namens keine Zauberei. Um aus den Futures Monaden zu machen, müssen nur wenige Forderungen erfüllt sein: Erstens braucht der Entwickler eine Methode, um Werte in Futures einzupacken, zweitens eine Methode, um mit Werten in Futures Berechnungen anzustellen, ohne diese via get() aus dem Kontext auspacken zu müssen. Diese heißt zum Beispiel bei Scala map und bei .NET/LINQ Select. Und drittens braucht er eine Methode, um asynchrone Aufrufe aneinanderketten zu können (was dem Schachteln von Callbacks entspricht). Diese heißt bei Scala flatMap und bei .NET/LINQ SelectMany.

So ein monadisches Future liefert Scala glücklicherweise gleich mit. Sieht das Beispiel von oben mit Scala-Futures nun besser aus?

import java.util.concurrent.{TimeUnit, Executors}
import Executors.newSingleThreadExecutor
import scala.concurrent.{ExecutionContext, Future}

case class User(name: String, videos: List[Video])
case class Video(id: String, rating: Rating)
case class Rating(rating: Int)

trait VideoService {
def getVideos(user: User): Future[List[Video]]
def getRating(video: Video): Future[Rating]
}

object Videos extends VideoService {
implicit val ctx = ExecutionContext
.fromExecutorService(newSingleThreadExecutor)

def getVideos(user: User) = async(user.videos)
def getRating(video: Video) = async(video.rating)

private def async[T](op: => T) = Future(op)
}

object Main extends App {
import Videos.ctx

val futureVideos = Videos.getVideos(
User("joe", List(Video("V1", Rating(9)),
Video("V2", Rating(3)))))

val futureRatings = futureVideos map { videos =>
videos map Videos.getRating
} // liefert Future[List[Future[Rating]]] - und nun?

// Callbacks für die Ausgabe
futureRatings onSuccess { case ratings =>
ratings foreach (_ onSuccess { case rating =>
println(rating)
})
}

ctx.awaitTermination(1L, TimeUnit.SECONDS)
ctx.shutdown
}

Ein sich daraus ergebender Vorteil ist, dass die Berechnung nun von der Ausgabe getrennt ist, ohne dass Variablen zu verändern wären. Damit kommen die Callbacks nur noch ganz am Ende zum Einsatz, um die Ergebnisse auf die Konsole auszugeben.

Aber das eigentliche Ziel, dass der Entwickler die Aufrufe möglichst schön zusammensetzen kann, ist leider nicht erreicht. Das Problem ist jetzt, dass Future und List zwei unterschiedliche Monaden sind, die gemischt verwendet werden. Das verkompliziert die Angelegenheit stark, sodass man von einer einfachen Benutzung der API noch weit entfernt sind. Und abgesehen davon hat die API eine weitere Schwäche: Sie liefert nämlich die Liste der Videos zwar asynchron, aber als ein großes Gesamtpaket. Schöner wäre es, die Videos asynchron jeweils zu bekommen, sobald sie zur Verfügung stehen, sprich: als Datenstrom.

Ab hier helfen die Future-Varianten nicht mehr weiter, denn Futures sind auf die Behandlung asynchroner Einzelwerte ausgelegt. Es liegt hier aber ein asynchroner Datenstrom vor, also eine asynchrone Folge von Einzelwerten, die man in ihrer Gesamtheit betrachten möchte. Dafür benötigt der Entwickler eine neue Abstraktion, die genau darauf ausgelegt ist: das "Observable". Dieses ist Grundlage für die sogenannten Reactive Extensions (Rx), die es für .NET schon länger gibt und die Microsoft im November 2012 als Open-Source-Software freigegeben hatte. Rx gibt es neben der .NET-Variante auch noch als RxJS für JavaScript. In der Java-Welt fand man leider lange Zeit nichts Vergleichbares – in jüngster Vergangenheit sind aber einige Projekte entstanden, die in unterschiedlichsten Varianten versuchen, Rx auf die Java Virtual Machine zu portieren. Eines der derzeit erfolgversprechendsten ist RxJava, das Netflix für die interne API in großem Umfang (mehr als zwei Milliarden API-Requests pro Tag) produktiv einsetzt.

Doch zurück zum Observable. Was ist das überhaupt? Zunächst einmal ist es einfach etwas, bei dem man einen Observer registrieren (und über eine Subscription auch deregistrieren) kann. Der Observer wiederum ist ein Interface, das aus drei Callback-Methoden besteht: einer für das Behandeln des jeweils nächsten Werts, einer für Fehler und einer für das Beenden:

trait Observable[+A] {
def subscribe(observer: Observer[A]): Subscription
}

trait Observer[-A] {
def onNext(value: A): Unit
def onError(error: Throwable): Unit
def onCompleted: Unit
}

trait Subscription {
def unsubscribe: Unit
}

Das liest sich zunächst unspektakulär. Parallelen zum synchronen Gegenstück des Iterable fallen auf: Es definiert im Wesentlichen einen Iterator. Und der wiederum ist dazu da, jeweils nächste Werte zu liefern. Fehler verursachen im synchronen Fall einfach Exceptions, und wann er fertig ist, kann man ihn auch fragen. Der Iterator ist dem Observer also sehr ähnlich. Tatsächlich sind das duale Konzepte. Doch so schön das in der Theorie sein mag, stellt sich die Frage, inwiefern diese einfache Definition, die im Wesentlichen ausgerechnet drei Callbacks bereitstellt, aus der Callback-Hölle führen soll?

Rx-Operatoren

Mit Rx-Operatoren aus der Callback-Hölle

Hierbei helfen die vielen in Rx auf dem Observable definierten Operatoren, die denen auf Iterable definierten in ihrer Verwendung ähnlich sind. Für einfache Transformationen der Werte, die in Observables stecken, gibt es beispielsweise den map-Operator. Er nimmt als Parameter eine Transformationsfunktion.

Transformieren von Observables via "map" (Abb. 1)

Hier ist map im Einsatz mit Hilfe von RxJava:

import rx.lang.scala.Observable
val chars = Observable("a", "b", "c")
chars.map(_.toUpperCase).subscribe(onNext = println)
// liefert "A", "B", "C"

Spannender werden Operatoren, sobald sie mehrere Observables kombinieren sollen. Wenn der Entwickler beispielsweise zwei Observables vom selben Typ hat, die er zu einem einzelnen zusammenfügen will, hat er dafür den merge-Operator:

Einfaches Zusammenfügen zweier Observables vom selben Typ (Abb. 2)
import rx.lang.scala.Observable
val chars = Observable("a", "b", "c")
val caps = Observable("A", "B", "C")
chars merge caps subscribe(onNext = println)
// liefert "a", "b", "c", "A", "B", "C"

Oft hat man mehrere unterschiedliche Observables, die kombiniert werden wollen. Im Videobeispiel wäre es etwa denkbar, dass man zu jedem Video jeweils Bewertungen und Kommentare kombinieren möchte. In so einem Fall kommt der zip-Operator ins Spiel. Er verwandelt mit einer Transformationsfunktion, die auf den einzelnen Observable-Werten arbeitet, zwei Eingabe- in ein Ausgabe-Observable. Dabei sorgt zip dafür, dass die Observables ordentlich verzahnt sind.

import rx.lang.scala.Observable
val chars = Observable("a", "b", "c")
val nums = Observable(1, 2, 3)
chars zip nums subscribe(onNext = println)
// liefert ("a", 1), ("b", 2), ("c", 3)

Will man die Verzahnung nicht, weil die Werte der beiden Eingabe-Observables nicht direkt zusammenhängen, gibt es dafür wieder andere ähnliche Operatoren wie combineLatest. Im Fall von Scala, wo es Tupel gibt, werden die Eingabe-Observables direkt zu einem Tupel kombiniert, statt eine Transformationsfunktion anzuwenden. Letzteres wäre mit map in einem weiteren nachgelagerten Schritt möglich, wenn der Entwickler es benötigt.

Zwei verschiedene Observables zu einem verzahnten umwandeln (Abb. 3)

Zu guter Letzt sei noch der Operator flatMap (auch ähnlich wie in .NET selectMany oder mapMany genannt) vorgestellt, der dazu dient, zwei geschachtelte Observables zu einem umzuwandeln. Er behandelt den Fall, dass der Entwickler zu jedem einzelnen Wert eines Eingabe-Observables eine Funktion aufrufen kann, die jeweils ein weiteres Observable zurückliefert. All diese Rückgabe-Observables werden vom flatMap-Operator in einem Ausgabe-Observable gesammelt.

Aufsammeln von Sekundär-Observables via "flatMap"/"mapMany" (Abb. 4)

Wie die Ergebnis-Observables von flatMap hintereinander gehängt werden, zeigt das folgende Listing:

import rx.lang.scala.Observable
val chars = Observable("a", "b", "c")
val addUpper = (s: String) => Observable(s, s.toUpperCase)
chars flatMap addUpper subscribe(onNext = println)
// liefert "a", "A", "b", "B", "c", "C"

Das Video-Beispiel ist ein typischer Fall für den flatMap-Operator: Die Videos werden als Observable geholt, und zu jedem Video wird eine Funktion aufgerufen, die asynchron die Bewertung holen soll. Diese Funktion liefert die Bewertung ebenfalls als Observable zurück. Das bedeutet, dass pro Video ein verschachteltes Observable mit Bewertungen zurückgeliefert wird (auch wenn im Beispiel je nur eine Bewertung zu erwarten ist). flatMap hilft, daraus ein einzelnes Observable zu erzeugen, das die Bewertungen der Videos nacheinander zurückliefert. Damit vereinfacht sich das Holen der Bewertungen im obigen Videoservice-Beispiel wie folgt, wenn man komplett auf Observables und RxJava umstellt:

import rx.lang.scala.Observable

case class User(name: String, videos: List[Video])
case class Video(id: String, rating: Rating)
case class Rating(rating: Int)

trait VideoService {
def getVideos(user: User): Observable[Video]
def getRating(video: Video): Observable[Rating]
}

object Videos extends VideoService {
def getVideos(user: User) =
Observable(user.videos: _*)
def getRating(video: Video) =
Observable(video.rating)
}

object Main extends App {
val videos = Videos.getVideos(
User("joe", List(Video("V1", Rating(9)),
Video("V2", Rating(3)))))

val ratings = videos flatMap Videos.getRating
ratings subscribe (onNext = println)
}

Fazit

Das Bearbeiten der asynchron eintreffenden Ergebnisse ist so kaum schwieriger, als es bei einer synchronen API wäre, und das war das eingangs gestellte Ziel. Dank der vielen zur Verfügung stehenden Operatoren in Rx ist es zudem einfach, selbst deutlich komplexere oder tiefer verschachtelte asynchrone Ereignisse weiterzuverarbeiten. Dadurch wandern die Callbacks im Quelltext an eine von der Geschäftslogik getrennte, einzelne Stelle. So sind die Observables aus Rx ein wesentlicher Baustein, um eine Anwendung reaktiv im Sinne von reaktionsfähig und ereignisgesteuert entwickeln zu können. (ane)

Joachim Hofer
studierte Informatik an der Friedrich-Alexander-Universität Erlangen-Nürnberg. Bei der imbus AG leitet er das TestBench-Entwicklungsteam. In seiner Freizeit entwickelt er an Open-Source-Projekten in den Bereichen Scala, SBT und Code Coverage mit.