Apache Kafka als Backend für Webanwendungen?

Netzsockel

Für die Anbindung der Clients sollen Websockets dienen. Sie erlauben eine bidirektionale Kommunikation zwischen Client und Server, die mit Events gesteuert werden. Somit ist keine der beiden Seiten blockiert oder muss auf das Gegenüber warten. Hierfür kommt die Bibliothek ws zum Einsatz. Sie bietet eine reine Serverimplementation. Am Client wird eine native Websocket-Implementierung vorausgesetzt, was ältere Browser ausschließt. Für den Test mit Kafka ist das ausreichend, für richtige Anwendungen sind jedoch mehrere Alternativen abzuwägen und je nach Voraussetzungen auszuwählen.

const WebSocket         = require('ws')
const kafkaProducer = require('./controllers/kafka.producer');
const kafkaConsumer = require('./controllers/kafka.consumer');

const wss = new WebSocket.Server({ port: 8989 })

const broadcast = (data, ws) => {
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN && client !== ws) {
client.send(JSON.stringify(data))
}
})
}

wss.on('connection', (ws) => {

kafkaConsumer.subscribeToTopics(['ADD_USER', 'ADD_MESSAGE'], ws);

ws.on('message', (message) => {
const data = JSON.parse(message)
console.log('send message', message)

...

kafkaProducer.createMessage(...)

...
})

ws.on('broadcast', (broadcastData) => {
broadcast(broadcastData, ws)
})

ws.on('close', () => {
...
})
})

Zunächst wird eine Instanz des Websocket-Servers erzeugt, die auf Port 8989 zu erreichen ist. Sobald sie zur Verfügung steht, kann man mit dem connection-Event darauf reagieren. Zunächst wird die Verbindung Richtung Kafka aufgebaut, indem die subscribeToTopics-Methode die relevanten Topics anlegt und darauf einen Consumer initialisiert (siehe die bisherigen Codebeispiele). Schicken die Clients neue Nachrichten via Websocket an den Server, kann man, wie bei der kafka-node-API, mit einem message-Event reagieren. An dieser Stelle wird auch der Producer aufgerufen, womit die Nachrichten in Kafka abgelegt werden. Damit auch der umgekehrte Weg funktioniert, gibt man beim Eintreffen neuer Nachrichten am Consumer diese nicht wie zuvor auf die Konsole aus, sondern löst am Websocket den broadcast-Event aus. Die Verbindung zum Socket wurde, zuvor durch die Übergabe an die subscribeToTopics-Methode hergestellt, sodass der Consumer Zugriff auf diesen hat.

ws.emit('broadcast', JSON.parse(data.value));

Dadurch wird im Anschluss ein Broadcast an alle bekannten (Websocket-)Clients ausgelöst, die dann ihrerseits die Nachricht verarbeiten können. Meldet sich einer der Clients ab, können Anwender mit dem close-Event auf das Abbauen der Websocket-Verbindung reagieren, was im Falle des Chats dazu benutzt wird, die Liste der aktiven Benutzer zu aktualisieren.

const socket = new WebSocket('ws://localhost:8989')

socket.onopen = () => {
socket.send( ... )
}
socket.onmessage = (event) => {
const data = JSON.parse(event.data)
...
}

Der hier gezeigte Clientcode ist das letzte Verbindungstück, um das Frontend mit der Middleware zu verbinden. Der Websocket verbindet sich mit dem konfigurierten Serverport, und beim Aufbau der Verbindung (socket.onopen) wird direkt eine Nachricht mit der Anmeldung des neuen Nutzers gesendet (socket.send). Wenn vom Server eine Nachricht ausgeht, kann man mit socket.onmessage darauf reagieren. Beim Chat selbst Redux-Actions erzeugt, sodass die Reducer den State aktualisieren können. Für den Rückweg kommt Redux-Saga zum Einsatz, das auf Neue-Nachrichten-Aktionen reagiert und sie per socket.send an den Server weitergibt. Die Abbildung zeigt das fertige Ergebnis des laufenden Chats mit Kafka.