Message Queues mit AMQP und Node.js

Anbindung

Node.js und AMQP verbinden

Um RabbitMQ aus Node.js anzusteuern, stehen unterschiedliche Module zur Verfügung. Um zu einem späteren Zeitpunkt potenziell auf andere Message-Queue-Implementierungen ausweichen zu können, empfiehlt es sich, ein Modul zu verwenden, das AMQP ohne RabbitMQ-spezifische Besonderheiten implementiert. Beispiele hierfür sind amqplib und node-amqp.

Zieht man als Auswahlkriterium für ein Modul die Anzahl seiner Stargazer auf GitHub heran, weist node-amqp die mit Abstand weiteste Verbreitung auf. Leider spiegelt sich das nur bedingt in der Qualität des Moduls wieder: Die über npm erhältliche Version verhält sich stellenweise fehlerhaft und entspricht in weiten Teilen nicht der auf GitHub bereitgestellten Dokumentation.

Aus dem Grund ist eine Installation in den lokalen Kontext der Anwendung mit dem üblichen Kommando

$ npm install amqp

zwar durchaus möglich, aber nicht ratsam. Die weitaus bessere Wahl stellt ein möglichst aktueller Stand aus dem Master-Branch auf GitHub dar, der sich ebenfalls mit npm installieren lässt. Allerdings benötigt man dazu die ID des gewünschten Commits.

Der Autor des vorliegenden Artikels hat gute Erfahrungen mit der Version aus Commit c4a4ddaa0cd8e449f89565624ccddf7599eec095 gemacht, die sich mit dem Kommando

$ npm install git+https://github.com/postwait/
node-amqp.git#c4a4ddaa0cd8e449f89565624ccddf7599eec095

einrichten lässt.

Wer die lokale Installation von RabbitMQ scheut, kann beispielsweise auf CloudAMQP zurückgreifen: Das Unternehmen bietet in der Cloud betriebene schlüsselfertige Installationen von RabbitMQ an. Für den Einstieg und zum Experimentieren stellt CloudAMQP zudem einen kostenfreien Plan zur Verfügung. Er ist zwar auf drei gleichzeitige Verbindungen und 30 MByte an monatlichem Datenvolumen begrenzt, für die ersten Schritte ist das aber mehr als ausreichend.

Nach der Installation kann der Entwickler das Modul mit Hilfe der require-Funktion in eine eigene Anwendung integrieren:

var amqp = require('amqp');

Als Nächstes muss man nun eine Verbindung zu einer Instanz von RabbitMQ aufbauen. Dazu dient die Funktion createConnection, die als Parameter im einfachsten Fall lediglich eine URL erwartet.

var connection = amqp.createConnection({
url: 'amqp://<username>:<password>@<host>:<port>/<vhost>'
});

Entspricht der Port der Standardeinstellung von RabbitMQ (5672), lässt sich auf dessen Angabe verzichten. Auch SSL- beziehungsweise TLS-verschlüsselte Verbindungen sind möglich, indem man als Protokoll amqps an Stelle von amqp angibt. Erfahrungsgemäß ist es hilfreich, zusätzlich zur URL den Parameter heartbeat anzugeben, der hilft, die Verbindung aufrecht zu erhalten. Ein Wert von 60 Sekunden kann dabei als erste Richtlinie für eigene Experimente dienen:

var connection = amqp.createConnection({
url: 'amqp://<username>:<password>@<host>:<port>/<vhost>',
heartbeat: 60
});

Nach dem Verbinden muss man zunächst auf das ready-Ereignis warten, bevor sich Exchange und Queues definieren lassen. Bricht die Verbindung ab, versucht node-amqp automatisch, sie wiederherzustellen. Gelingt das, löst das Modul das ready-Ereignis erneut aus.

Um zu verhindern, dass node-amqp die bereits bestehenden Exchanges und Queues erneut definiert, muss man darauf achten, das ready-Ereignis in Kombination mit der once- und nicht der on-Funktion zu verwenden:

connection.once('ready', function () {
// ...
});

Exchanges definieren

Von Haus aus stellt RabbitMQ einen Exchange im "Direct"-Modus zur Verfügung, allerdings lässt sich dessen Konfiguration – einmal festgelegt – nicht mehr ändern. Abhilfe ist dann nur über das Löschen und erneute Erzeugen des Exchanges möglich.

Daher empfiehlt es sich, den enthaltenen Exchange nicht zu verwenden, sondern von vornherein einen eigenen mit der gewünschten Konfiguration anzulegen. Dazu dient die Funktion exchange, die den gewählten Namen des neuen Exchanges und dessen Konfiguration als Parameterobjekt erwartet:

connect.exchange('...', { ... }, function (exchange) {
// ...
});

Während der Name frei wählbar ist, verdient das Parameterobjekt besondere Aufmerksamkeit. Es kann
die folgenden Optionen enthalten: Die type-Eigenschaft gibt an, in welchem Modus der Exchange zu betreiben ist. Gültige Werte sind direct, fanout und topic. Wird sie nicht angegeben, betreibt RabbitMQ den Exchange automatisch im "topic"-Modus. Die passive-Eigenschaft gibt an, ob der Exchange zu erzeugen ist, sollte er nicht existieren. Dieses Verhalten entspricht dem Wert false, der auch als Standardwert dient.

Ob eine Zustellbestätigung für Nachrichten, die der Exchange versendet, erforderlich ist, definiert die Eigenschaft confirm. Standardmäßig ist das Verhalten abgeschaltet. Um es zu aktivieren, muss man der Eigenschaft den Wert true zuweisen.

Die Eigenschaft durable legt fest, ob RabbitMQ Nachrichten persistieren soll, die über den Exchange versendet werden, sodass sie auch nach einem Neustart noch vorhanden sind. Da die Leistung bei abgeschalteter Persistenz höher ist, entspricht das dem Standardverhalten. Um die zusätzliche Archivierung zu aktivieren, ist der Eigenschaft der Wert true zuzuweisen. autoDelete gibt schließlich an, ob RabbitMQ den Exchange automatisch löschen soll, wenn sich alle Queues abgemeldet haben. Standardmäßig weist diese Eigenschaft den Wert true auf.

Will man nun beispielsweise einen Exchange im "Direct"-Modus erzeugen, der dauerhaft verfügbar ist, Nachrichten persistiert und deren erfolgreiche Verarbeitung bestätigt, ist der folgende Funktionsaufruf erforderlich:

connect.exchange('status', {
type: 'direct',
durable: true,
confirm: true,
autoDelete: false
}, function (exchange) {
// ...
});

Sobald der Exchange zur Verfügung steht, löst node-amqp den angegebenen Callback aus und übergibt eine Referenz auf den Exchange als Parameter.