Message Queues mit AMQP und Node.js

Senden und Empfangen

Nachrichten versenden

Für den Versand einer Nachricht benötigt man bei AMQP zumindest aus Sicht des Producers lediglich einen Exchange, aber nicht zwingend eine Queue. Daher genügt der oben gezeigte Code bereits, um einen einfachen Producer zu implementieren. Dabei übernimmt die publish-Funktion, die zusätzlich zur eigentlichen Nachricht einen Routingschlüssel und ein Parameterobjekt erwartet, das Versenden. Der Routingschlüssel ist eine einfache Zeichenkette, als Nachricht lassen sich Objekte und Buffer verwenden:

exchange.publish('...', new Buffer(...), {
...
}, function (hasErrorOccured, err) {
// ...
});

Das Parameterobjekt nimmt eine ganze Reihe von Optionen entgegen, von denen die meisten allerdings lediglich Metadaten der eigentlichen Nachricht darstellen. Dazu zählen beispielsweise benutzerdefinierte Header, IDs und diverse Zeitstempel.

All diese Informationen lassen sich allerdings ohne Weiteres direkt in der Nachricht unterbringen, weshalb die Anzahl der wirklich erforderlichen Optionen verhältnismäßig gering ausfällt:

  • Die Eigenschaft deliveryMode gibt an, ob die Nachricht zu persistieren ist oder nicht. Die Option ist aus naheliegenden Gründen ausschließlich für Exchanges sinnvoll nutzbar, die mit der Option durable erstellt wurden. Gibt man den Wert 1 an, persistiert die Anwendung die Nachricht nicht, beim Wert 2 schreibt sie sie fest.
  • mandatory steuert das Verhalten von RabbitMQ, wenn man eine Nachricht versendet, obwohl keine Queue am Exchange registriert ist: Weist man der Eigenschaft den Wert false zu, verwirft RabbitMQ die Nachricht. Das entspricht zugleich dem Standardverhalten. Gibt man hingegen den Wert true an, löst node-amqp einen Fehler aus.
  • Die immediate-Eigenschaft ähnelt der mandatory-Eigenschaft in gewissem Sinne: Sie steuert das Verhalten von RabbitMQ, wenn man eine Nachricht versendet, aber kein Consumer bereit ist, die Nachricht sofort zu verarbeiten. Weist man der Eigenschaft den Wert false zu, wird die Nachricht in die Queue geschrieben. Auch das entspricht dem Standardverhalten. Gibt man den Wert true an, löst node-amqp einen Fehler aus, sollte kein Consumer sofort verfügbar sein.

Wichtig zu wissen ist, dass node-amqp den Callback der publish-Funktion ausschließlich dann auslöst, wenn man den Exchange mit der confirm-Option erzeugt hat. Ungewöhnlich für Node.js ist die Signatur des Callbacks, die sich nicht an das unter Node.js gängige Muster hält:

function (hasErrorOccured, err)

Der erste Paramater gibt als logischer Wert an, ob überhaupt ein Fehler aufgetreten ist. Enthält hasErrorOccured den Wert false, so bedeutet das, dass die Nachricht erfolgreich beim Empfänger eingegangen ist. Trat hingegen ein Fehler auf, finden sich Details im Parameter err.

Mit diesem Wissen kann man die vorhandenen Codeabschnitte nun zusammenfügen und einen einfachen Producer implementieren:

var connection = amqp.createConnection({
url: 'amqp://<username>:<password>@<host>:<port>/<vhost>',
heartbeat: 60
});

connection.once('ready', function () {
connect.exchange('status', {
type: 'direct',
durable: true,
confirm: true,
autoDelete: false
}, function (exchange) {
exchange.publish('', { status: 'Online' }, {
deliveryMode: 2
}, function (hasErrorOccured, err) {
if (hasErrorOccured) {
return console.log(err.message);
}
console.log('Nachricht versendet!');
});
});
});

Queues definieren und registrieren

Um einen Consumer zu implementieren, kann man mit Ausnahme des publish-Aufrufs das gleiche Code-Grundgerüst verwenden. Allerdings ist außer einem Exchange auch eine Queue nötig, die man auf einen Exchange registrieren kann. In node-amqp sind dafür zwei separate Schritte nötig.

Zunächst ist die Queue zu erzeugen. Das übernimmt die Funktion queue, die – wider Erwarten – nicht an einem Exchange, sondern an der Verbindung aufzurufen ist. Die erforderlichen Parameter gleichen jenen der exchange-Funktion. Zusätzlich zum Namen der Queue erwartet die Funktion ein Parameterobjekt:

connection.queue('...', { ... }, function (queue) {
// ...
});

Lediglich die verfügbaren Optionen unterscheiden sich leicht, sie sind allerdings bereits weitestgehend bekannt: Die Eigenschaften passive, durable und autoDelete verhalten sich analog zu den Optionen von Exchanges.

Neu ist einzig exclusive, die festlegt, ob die Queue ausschließlich von der aktuellen Verbindung ausgelesen werden darf. Standardmäßig ist das abgeschaltet. Um die Exklusivität der Queue zu aktivieren, muss man der Eigenschaft den Wert true zuweisen. Das impliziert zudem das automatische Aktivieren der autoDelete-Eigenschaft.

Als zweiter Schritt ist die erzeugte Queue noch an einem Exchange zu registrieren. Dazu dient die bind-Funktion, die neben dem Namen des Exchanges auch den zu verwendenden Routingschlüssel erwartet. Sobald sie die Queue registriert hat, löst sie das queueBindOk-Ereignis aus:

queue.bind('...', '...');
queue.once('queueBindOk', function () {
// ...
});

Die vollständige Definition einer Queue sieht damit wie folgt aus:

connection.queue('statusQueue', {
durable: true,
autoDelete: false
}, function (queue) {
queue.bind('status', '');
queue.once('queueBindOk', function () {
// ...
});
});

Nachrichten empfangen

Nachdem die Queue zur Verfügung steht und mit einem Exchange verbunden ist, kann sich der Consumer endlich um den Empfang von Nachrichten kümmern. Dazu benötigt er die subscribe-Funktion, die außer einem Parameterobjekt lediglich einen Callback erwartet, den die Anwendung pro empfangener Nachricht aufruft:

queue.subscribe({ ... }, function (message, headers, info) {
// ...
});

Der Parameter message enthält die eigentliche Nachricht, headers die Metadaten (falls vorhanden), und info ein Objekt mit zusätzlichen Daten. Unter anderem lässt sich darüber der verwendete Routingschlüssel abrufen.

Um den Empfang von Nachrichten zu bestätigen, ist der Eigenschaft ack der Wert true zuzuweisen. Das bewirkt, dass RabbitMQ keine weiteren Nachrichten zustellt, bevor die Gegenstelle den Empfang nicht bestätigt hat. Dazu dient die shift-Funktion, deren Signatur leider alles andere als intuitiv ist: Um die erfolgreiche Verarbeitung einer Nachricht zu signalisieren, ist die Funktion ohne Parameter aufzurufen.

Alternativ kann man zwei logische Parameter angeben: Der erste informiert darüber, ob die Nachricht zurückgewiesen werden soll, der zweite, ob sie erneut in die Queue einzufügen ist. In der Regel gibt man für beide Parameter den Wert true an, es sind allerdings auch Kombinationen denkbar.

Ein einfacher Consumer, der den Empfang von Nachrichten bestätigen soll, lässt sich nun wie folgt implementieren:

connection.queue('statusQueue', {
durable: true,
autoDelete: false
}, function (queue) {
queue.bind('status', '');
queue.once('queueBindOk', function () {
queue.subscribe({
ack: true
}, function (message) {
// Handle message
if (err) {
return queue.shift(true, true);
}
queue.shift();
});
});
});