zurück zum Artikel

Message Queues mit AMQP und Node.js

Architektur/Methoden
Message Queues mit AMQP und Node.js

Message Queues erleichtern das Entwickeln entkoppelter, autonomer Dienste in verteilten Architekturen. Als Protokoll kommt hierfür häufig AMQP zum Einsatz, das sich auch in Verbindung mit RabbitMQ und Node.js verwenden lässt. Allerdings gilt es, dabei einige Stolperfallen zu beachten.

Die Entwicklung von Webanwendungen ist naturgemäß verteilt. Schon die Kombination aus Webbrowser und -server stellt ein verteiltes System dar. Aus dem Grund müssen Entwickler auch ohne die Integration zusätzlicher Webdienste einige Besonderheiten verteilter Systeme beachten: Hierzu zählt unter anderem der Umgang mit einer langsamen oder gar abgebrochenen Verbindung.

Damit der Ausfall einer einzelnen Komponente nicht den Rest der Webanwendung beeinträchtigt, muss jede Komponente autonom arbeiten können. Es gilt also, sie zu entkoppeln. Eine gangbare Maßnahme hierfür ist der Verzicht auf Pull-Verfahren: Da eine Abfrage ein funktionierendes und erreichbares Gegenüber voraussetzt, liegt es nahe, dass dieses Vorgehen in verteilten Systemen gewisse Nachteile
birgt. Als Alternative bieten sich Push-Verfahren an, die die zu übertragenden Nachrichten bei einer fehlenden oder fehlerhaften Verbindung zunächst lokal zwischenspeichern, um sie dann zu einem späteren Zeitpunkt auszuliefern. Der Nachteil der Methode ist, dass sie sich nicht für alle Szenarien gleichermaßen eignet: Eine Suchmaschine wie Google ist beispielsweise kaum als Push-Dienst denkbar. Architekturen wie Command Query Responsibility Segregation (CQRS) beweisen allerdings, dass sich das Pushen von Nachrichten als Standardvorgehen für ein verteiltes Systems erfolgreich einsetzen lässt.

Eine Message Queue verwenden

Um den Versand von Nachrichten durchzuführen, benötigt man neben einem Transportkanal zumindest einen lokalen Puffer, der nicht versandfähige Nachrichten gegebenenfalls zwischenspeichert. Beide Aspekte kann man hervorragend in einem gemeinsamen Dienst kapseln, der anschließend den übrigen Komponenten zur Verfügung steht.

Genau das ist die primäre Aufgabe einer Message Queue: Sie nimmt Nachrichten von anderen Komponenten entgegen und speichert sie zwischen, bis die Zielkomponente verfügbar ist. Die versendende Komponente wird dabei als Publisher bezeichnet, die empfangende als Consumer (siehe
Abbildung 1).

Die einfachste Struktur von Message Queues entkoppelt zwei Komponenten, den Publisher und den Consumer, durch einen Puffer. (Abb. 1)
Die einfachste Struktur von Message Queues entkoppelt zwei Komponenten, den Publisher und den Consumer, durch einen Puffer. (Abb. 1)


Darüber hinaus nehmen Message Queues häufig noch weitere Aufgaben wahr, unter anderem das Routing an Consumer und das garantierte Zustellen von Nachrichten.

Um unterschiedliche Implementierungen von Message Queues einheitlich ansprechen zu können, wurden in den vergangenen Jahren mehrere Protokolle entwickelt – unter anderem das Advanced Message Queuing Protocol (AMQP), das Stream Text Oriented Messaging Protol (STOMP) und Message Queue Telemetry Transport (MQTT). Sie unterscheiden sich bezüglich ihrer Ausrichtung und ihrer Fähigkeiten teilweise deutlich voneinander: Während AMQP primär im Unternehmensbereich zum Einsatz kommt, stellt MQTT den De-facto-Standard für den IoT-Bereich (Internet of Things) dar.

Eine weit verbreitete Message Queue ist RabbitMQ [1], die VMware in der Programmiersprache Erlang entwickelt und kostenfrei unter einer Open-Source-Lizenz (Mozilla Public License [2]) zur Verfügung stellt. RabbitMQ beherrscht von Haus aus das AMQP-Protokoll, lässt sich jedoch mit Plug-ins um die Unterstützung für STOMP [3] und MQTT [4] erweitern.

Exchanges, Queues & Co.

Im Unterschied zu der in Abbildung 1 dargestellten Struktur entkoppelt RabbitMQ den Publisher nochmals von der eigentlichen Queue: Dazu dient der sogenannte Exchange (siehe Abbildung 2).

Der Exchange entkoppelt den Producer von der eigentlichen Queue und eröffnet so unterschiedliche Möglichkeiten für das Routing von Nachrichten. (Abb. 2)
Der Exchange entkoppelt den Producer von der eigentlichen Queue und eröffnet so unterschiedliche Möglichkeiten für das Routing von Nachrichten. (Abb. 2)

Er ermöglicht zwei Szenarien der Zustellung von Nachrichten:

Teilen sich mehrere Consumer eine Queue, verteilt RabbitMQ die Nachrichten auf die einzelnen Consumer. (Abb. 3)
Teilen sich mehrere Consumer eine Queue, verteilt RabbitMQ die Nachrichten auf die einzelnen Consumer. (Abb. 3)
Verfügt jeder Consumer hingegen über eine eigene Queue, erhält er alle Nachrichten, die über den Exchange versendet werden. (Abb. 4)
Verfügt jeder Consumer hingegen über eine eigene Queue, erhält er alle Nachrichten, die über den Exchange versendet werden. (Abb. 4)

Allerdings ist solch ein Verhalten ebenfalls konfigurierbar, denn ein Exchange lässt sich in unterschiedlichen Modi betreiben. Standardmäßig verwenden Exchanges in RabbitMQ den "Direct"-Modus, alternativ kann man sie allerdings auch in den "Fanout"- oder den "Topic"-Modus schalten. Sie unterscheiden sich wie folgt:

Im "Direct"-Modus berücksichtigt ein Exchange den Routingschlüssel von Nachrichten, um sie gezielt an bestimmte Queues auszuliefern. (Abb. 5)
Im "Direct"-Modus berücksichtigt ein Exchange den Routingschlüssel von Nachrichten, um sie gezielt an bestimmte Queues auszuliefern. (Abb. 5)

Zusätzlich zu der Konfiguration der Exchanges verfügt RabbitMQ über einige weitere Optionen zur Konfiguration. Beispielsweise lässt sich einstellen, ob die Lebensdauer einer Queue an die Lebensdauer des zugehörigen Consumers zu binden ist: Trennt der Consumer die Verbindung, kann RabbitMQ die zugehörige Queue auf Wunsch automatisch entfernen.

Außerdem lassen sich Exchanges und Queues als "durable" kennzeichnen, was bewirkt, dass RabbitMQ sämtliche Nachrichten persistiert. Aktiviert man die Option, überleben noch nicht ausgelieferte Nachrichten einen Absturz oder Neustart von RabbitMQ. Zugleich sinkt allerdings deren Leistungsfähigkeit, da Lese- und Schreibzugriffe auf ein physisches Laufwerk weitaus langsamer sind als jene auf den Arbeitsspeicher.

Zu guter Letzt lässt sich auch eine Zustellbestätigung anfordern: Normalerweise entfernt RabbitMQ Nachrichten aus Queues, sobald sie sie an den zugehörigen Consumer übertragen hat. Stürzt der Consumer anschließend jedoch ab, ohne die Nachricht vollständig zu verarbeiten, ist sie verloren. Dem lässt sich mit Zustellbestätigungen ein einfacher Riegel vorschieben: Verliert eine Queue die Verbindung zu ihrem Consumer, liefert sie bei der nächsten Verbindung all die Nachrichten erneut aus, deren Verarbeitung noch nicht bestätigt wurde.

Anbindung

Node.js und AMQP verbinden

Um RabbitMQ aus Node.js anzusteuern, stehen unterschiedliche Module zur Verfügung. Um zu einem späteren Zeitpunkt potenziell auf andere Message-Queue-Implementierungen ausweichen zu können, empfiehlt es sich, ein Modul zu verwenden, das AMQP ohne RabbitMQ-spezifische Besonderheiten implementiert. Beispiele hierfür sind amqplib [5] und node-amqp [6].

Zieht man als Auswahlkriterium für ein Modul die Anzahl seiner Stargazer auf GitHub heran, weist node-amqp die mit Abstand weiteste Verbreitung auf. Leider spiegelt sich das nur bedingt in der Qualität des Moduls wieder: Die über npm erhältliche Version verhält sich stellenweise fehlerhaft und entspricht in weiten Teilen nicht der auf GitHub bereitgestellten Dokumentation.

Aus dem Grund ist eine Installation in den lokalen Kontext der Anwendung mit dem üblichen Kommando

$ npm install amqp

zwar durchaus möglich, aber nicht ratsam. Die weitaus bessere Wahl stellt ein möglichst aktueller Stand aus dem Master-Branch auf GitHub dar, der sich ebenfalls mit npm installieren lässt. Allerdings benötigt man dazu die ID des gewünschten Commits.

Der Autor des vorliegenden Artikels hat gute Erfahrungen mit der Version aus Commit c4a4ddaa0cd8e449f89565624ccddf7599eec095 gemacht, die sich mit dem Kommando

$ npm install git+https://github.com/postwait/
node-amqp.git#c4a4ddaa0cd8e449f89565624ccddf7599eec095

einrichten lässt.

Wer die lokale Installation von RabbitMQ scheut, kann beispielsweise auf CloudAMQP [7] zurückgreifen: Das Unternehmen bietet in der Cloud betriebene schlüsselfertige Installationen von RabbitMQ an. Für den Einstieg und zum Experimentieren stellt CloudAMQP zudem einen kostenfreien Plan zur Verfügung. Er ist zwar auf drei gleichzeitige Verbindungen und 30 MByte an monatlichem Datenvolumen begrenzt, für die ersten Schritte ist das aber mehr als ausreichend.

Nach der Installation kann der Entwickler das Modul mit Hilfe der require-Funktion in eine eigene Anwendung integrieren:

var amqp = require('amqp');

Als Nächstes muss man nun eine Verbindung zu einer Instanz von RabbitMQ aufbauen. Dazu dient die Funktion createConnection, die als Parameter im einfachsten Fall lediglich eine URL erwartet.

var connection = amqp.createConnection({
url: 'amqp://<username>:<password>@<host>:<port>/<vhost>'
});

Entspricht der Port der Standardeinstellung von RabbitMQ (5672), lässt sich auf dessen Angabe verzichten. Auch SSL- beziehungsweise TLS-verschlüsselte Verbindungen sind möglich, indem man als Protokoll amqps an Stelle von amqp angibt. Erfahrungsgemäß ist es hilfreich, zusätzlich zur URL den Parameter heartbeat anzugeben, der hilft, die Verbindung aufrecht zu erhalten. Ein Wert von 60 Sekunden kann dabei als erste Richtlinie für eigene Experimente dienen:

var connection = amqp.createConnection({
url: 'amqp://<username>:<password>@<host>:<port>/<vhost>',
heartbeat: 60
});

Nach dem Verbinden muss man zunächst auf das ready-Ereignis warten, bevor sich Exchange und Queues definieren lassen. Bricht die Verbindung ab, versucht node-amqp automatisch, sie wiederherzustellen. Gelingt das, löst das Modul das ready-Ereignis erneut aus.

Um zu verhindern, dass node-amqp die bereits bestehenden Exchanges und Queues erneut definiert, muss man darauf achten, das ready-Ereignis in Kombination mit der once- und nicht der on-Funktion zu verwenden:

connection.once('ready', function () {
// ...
});

Exchanges definieren

Von Haus aus stellt RabbitMQ einen Exchange im "Direct"-Modus zur Verfügung, allerdings lässt sich dessen Konfiguration – einmal festgelegt – nicht mehr ändern. Abhilfe ist dann nur über das Löschen und erneute Erzeugen des Exchanges möglich.

Daher empfiehlt es sich, den enthaltenen Exchange nicht zu verwenden, sondern von vornherein einen eigenen mit der gewünschten Konfiguration anzulegen. Dazu dient die Funktion exchange, die den gewählten Namen des neuen Exchanges und dessen Konfiguration als Parameterobjekt erwartet:

connect.exchange('...', { ... }, function (exchange) {
// ...
});

Während der Name frei wählbar ist, verdient das Parameterobjekt besondere Aufmerksamkeit. Es kann
die folgenden Optionen enthalten: Die type-Eigenschaft gibt an, in welchem Modus der Exchange zu betreiben ist. Gültige Werte sind direct, fanout und topic. Wird sie nicht angegeben, betreibt RabbitMQ den Exchange automatisch im "topic"-Modus. Die passive-Eigenschaft gibt an, ob der Exchange zu erzeugen ist, sollte er nicht existieren. Dieses Verhalten entspricht dem Wert false, der auch als Standardwert dient.

Ob eine Zustellbestätigung für Nachrichten, die der Exchange versendet, erforderlich ist, definiert die Eigenschaft confirm. Standardmäßig ist das Verhalten abgeschaltet. Um es zu aktivieren, muss man der Eigenschaft den Wert true zuweisen.

Die Eigenschaft durable legt fest, ob RabbitMQ Nachrichten persistieren soll, die über den Exchange versendet werden, sodass sie auch nach einem Neustart noch vorhanden sind. Da die Leistung bei abgeschalteter Persistenz höher ist, entspricht das dem Standardverhalten. Um die zusätzliche Archivierung zu aktivieren, ist der Eigenschaft der Wert true zuzuweisen. autoDelete gibt schließlich an, ob RabbitMQ den Exchange automatisch löschen soll, wenn sich alle Queues abgemeldet haben. Standardmäßig weist diese Eigenschaft den Wert true auf.

Will man nun beispielsweise einen Exchange im "Direct"-Modus erzeugen, der dauerhaft verfügbar ist, Nachrichten persistiert und deren erfolgreiche Verarbeitung bestätigt, ist der folgende Funktionsaufruf erforderlich:

connect.exchange('status', {
type: 'direct',
durable: true,
confirm: true,
autoDelete: false
}, function (exchange) {
// ...
});

Sobald der Exchange zur Verfügung steht, löst node-amqp den angegebenen Callback aus und übergibt eine Referenz auf den Exchange als Parameter.

Senden und Empfangen

Nachrichten versenden

Für den Versand einer Nachricht benötigt man bei AMQP zumindest aus Sicht des Producers lediglich einen Exchange, aber nicht zwingend eine Queue. Daher genügt der oben gezeigte Code bereits, um einen einfachen Producer zu implementieren. Dabei übernimmt die publish-Funktion, die zusätzlich zur eigentlichen Nachricht einen Routingschlüssel und ein Parameterobjekt erwartet, das Versenden. Der Routingschlüssel ist eine einfache Zeichenkette, als Nachricht lassen sich Objekte und Buffer verwenden:

exchange.publish('...', new Buffer(...), {
...
}, function (hasErrorOccured, err) {
// ...
});

Das Parameterobjekt nimmt eine ganze Reihe von Optionen entgegen, von denen die meisten allerdings lediglich Metadaten der eigentlichen Nachricht darstellen. Dazu zählen beispielsweise benutzerdefinierte Header, IDs und diverse Zeitstempel.

All diese Informationen lassen sich allerdings ohne Weiteres direkt in der Nachricht unterbringen, weshalb die Anzahl der wirklich erforderlichen Optionen verhältnismäßig gering ausfällt:

Wichtig zu wissen ist, dass node-amqp den Callback der publish-Funktion ausschließlich dann auslöst, wenn man den Exchange mit der confirm-Option erzeugt hat. Ungewöhnlich für Node.js ist die Signatur des Callbacks, die sich nicht an das unter Node.js gängige Muster hält:

function (hasErrorOccured, err)

Der erste Paramater gibt als logischer Wert an, ob überhaupt ein Fehler aufgetreten ist. Enthält hasErrorOccured den Wert false, so bedeutet das, dass die Nachricht erfolgreich beim Empfänger eingegangen ist. Trat hingegen ein Fehler auf, finden sich Details im Parameter err.

Mit diesem Wissen kann man die vorhandenen Codeabschnitte nun zusammenfügen und einen einfachen Producer implementieren:

var connection = amqp.createConnection({
url: 'amqp://<username>:<password>@<host>:<port>/<vhost>',
heartbeat: 60
});

connection.once('ready', function () {
connect.exchange('status', {
type: 'direct',
durable: true,
confirm: true,
autoDelete: false
}, function (exchange) {
exchange.publish('', { status: 'Online' }, {
deliveryMode: 2
}, function (hasErrorOccured, err) {
if (hasErrorOccured) {
return console.log(err.message);
}
console.log('Nachricht versendet!');
});
});
});

Queues definieren und registrieren

Um einen Consumer zu implementieren, kann man mit Ausnahme des publish-Aufrufs das gleiche Code-Grundgerüst verwenden. Allerdings ist außer einem Exchange auch eine Queue nötig, die man auf einen Exchange registrieren kann. In node-amqp sind dafür zwei separate Schritte nötig.

Zunächst ist die Queue zu erzeugen. Das übernimmt die Funktion queue, die – wider Erwarten – nicht an einem Exchange, sondern an der Verbindung aufzurufen ist. Die erforderlichen Parameter gleichen jenen der exchange-Funktion. Zusätzlich zum Namen der Queue erwartet die Funktion ein Parameterobjekt:

connection.queue('...', { ... }, function (queue) {
// ...
});

Lediglich die verfügbaren Optionen unterscheiden sich leicht, sie sind allerdings bereits weitestgehend bekannt: Die Eigenschaften passive, durable und autoDelete verhalten sich analog zu den Optionen von Exchanges.

Neu ist einzig exclusive, die festlegt, ob die Queue ausschließlich von der aktuellen Verbindung ausgelesen werden darf. Standardmäßig ist das abgeschaltet. Um die Exklusivität der Queue zu aktivieren, muss man der Eigenschaft den Wert true zuweisen. Das impliziert zudem das automatische Aktivieren der autoDelete-Eigenschaft.

Als zweiter Schritt ist die erzeugte Queue noch an einem Exchange zu registrieren. Dazu dient die bind-Funktion, die neben dem Namen des Exchanges auch den zu verwendenden Routingschlüssel erwartet. Sobald sie die Queue registriert hat, löst sie das queueBindOk-Ereignis aus:

queue.bind('...', '...');
queue.once('queueBindOk', function () {
// ...
});

Die vollständige Definition einer Queue sieht damit wie folgt aus:

connection.queue('statusQueue', {
durable: true,
autoDelete: false
}, function (queue) {
queue.bind('status', '');
queue.once('queueBindOk', function () {
// ...
});
});

Nachrichten empfangen

Nachdem die Queue zur Verfügung steht und mit einem Exchange verbunden ist, kann sich der Consumer endlich um den Empfang von Nachrichten kümmern. Dazu benötigt er die subscribe-Funktion, die außer einem Parameterobjekt lediglich einen Callback erwartet, den die Anwendung pro empfangener Nachricht aufruft:

queue.subscribe({ ... }, function (message, headers, info) {
// ...
});

Der Parameter message enthält die eigentliche Nachricht, headers die Metadaten (falls vorhanden), und info ein Objekt mit zusätzlichen Daten. Unter anderem lässt sich darüber der verwendete Routingschlüssel abrufen.

Um den Empfang von Nachrichten zu bestätigen, ist der Eigenschaft ack der Wert true zuzuweisen. Das bewirkt, dass RabbitMQ keine weiteren Nachrichten zustellt, bevor die Gegenstelle den Empfang nicht bestätigt hat. Dazu dient die shift-Funktion, deren Signatur leider alles andere als intuitiv ist: Um die erfolgreiche Verarbeitung einer Nachricht zu signalisieren, ist die Funktion ohne Parameter aufzurufen.

Alternativ kann man zwei logische Parameter angeben: Der erste informiert darüber, ob die Nachricht zurückgewiesen werden soll, der zweite, ob sie erneut in die Queue einzufügen ist. In der Regel gibt man für beide Parameter den Wert true an, es sind allerdings auch Kombinationen denkbar.

Ein einfacher Consumer, der den Empfang von Nachrichten bestätigen soll, lässt sich nun wie folgt implementieren:

connection.queue('statusQueue', {
durable: true,
autoDelete: false
}, function (queue) {
queue.bind('status', '');
queue.once('queueBindOk', function () {
queue.subscribe({
ack: true
}, function (message) {
// Handle message
if (err) {
return queue.shift(true, true);
}
queue.shift();
});
});
});

Fazit

Es existieren unterschiedliche Module, um eine AMQP-basierte Message Queue wie beispielsweise RabbitMQ aus Node.js heraus anzusprechen und Producer und Consumer zu implementieren. Als in Bezug auf AMQP am weitesten verbreitete Option bietet sich hierfür node-amqp an. Das Modul fällt allerdings durch schlechte Dokumentation und einen veralteten öffentlichen Stand auf. Deshalb ist es zwingend erforderlich, mit einer aktuellen Variante aus dem master-Branch von GitHub zu arbeiten.

Leider ist auch die API nicht konsistent, da teilweise Callbacks, teilweise Ereignisse zum Einsatz kommen. All das erschwert die Einarbeitung in node-amqp unnötig. Hat man die Hürden allerdings einmal umschifft, lassen sich verlässliche Anwendungen entwickeln, die in einer verteilten Architektur unabhängig und entkoppelt voneinander arbeiten können.

Alternativ zu node-amqp wirkt das Modul amqplib vielversprechend – zumindest macht es einen weitaus aufgeräumteren und gepflegteren Eindruck. Allerdings mangelt es zumindest bislang noch an der Verbreitung, was sich insbesondere in der Welt von Node.js allerdings rasch ändern kann. Es bleibt daher abzuwarten, wie sich beide Module in den nächsten Monaten weiterentwickeln. (jul [8])

Golo Roden
ist Gründer der "the native web UG", eines auf native Webtechniken spezialisierten Unternehmens. Für die Entwicklung moderner Webanwendungen bevorzugt er JavaScript und Node.js und hat mit "Node.js & Co." das erste deutschsprachige Buch zum Thema geschrieben.


URL dieses Artikels:
http://www.heise.de/-2099074

Links in diesem Artikel:
[1] http://www.rabbitmq.com/
[2] http://www.rabbitmq.com/mpl.html
[3] https://www.rabbitmq.com/stomp.html
[4] https://www.rabbitmq.com/mqtt.html
[5] https://github.com/squaremo/amqp.node
[6] https://github.com/postwait/node-amqp
[7] http://www.cloudamqp.com/
[8] mailto:jul@heise.de