C++20: Ein unendlicher Datenstrom mit Coroutinen

Modernes C++  –  93 Kommentare

Meine Geschichte zu Coroutinen in C++20 geht weiter. Heute tauche ich in das Coroutinen-Framework ein, um einen unendlichen Datenstrom zu erzeugen. Konsequenterweise solltest du meine zwei vorherigen Artikel "C++20: Coroutinen – ein erster Überblick" und "C++20: Mehr Details zu Coroutinen" kennen.

Das Framework zum Schreiben von Coroutinen besteht aus mehr als 20 Funktionen, die du teilweise implementieren oder teilweise überschreiben kannst. Damit lässt sich das Verhalten der Coroutinen sehr feingranular steuern. Am Ende lässt sich zum Beispiel eine Coroutine erzeugen, die einen Generator Generator<int> für einen unendlichen Datenstrom erzeugt:

Generator<int> getNext(int start = 0, int step = 1) {
auto value = start;
for (int i = 0;; ++i) {
co_yield value;
value += step;
}
}

Nun habe ich das Ziel des heutigen Artikels vorgestellt. Jetzt beginnt die Arbeit.

Das Framework

Eine Coroutine besteht aus drei Komponenten: Promise-Objekt, Coroutinen-Handle und Coroutinen-Frame.

  • Promise-Objekt: Es wird innerhalb der Coroutine verändert und gibt ihr Ergebnis mithilfe des Promise-Objekts zurück.
  • Coroutinen-Handle: Er ist ein nichtbesitzender Verweis, der außerhalb der Coroutine erlaubt, die Coroutine wieder auszuführen oder zu zerstören.
  • Coroutinen-Frame: Er ist ein interner, Heap-basierter Zustand. Dieser besteht aus dem Promise-Objekt, den kopierten Argumenten der Coroutine, den Anhaltepunkten der Coroutine (suspension points) und den lokalen Objekten, deren Gültigkeit vor oder nach dem Anhaltepunkt enden.
Ein vereinfachter Workflow

Wenn du co_yield, co_await oder co_return in einer Funktion einsetzt, wird diese Funktion zur Coroutine. Dadurch transformiert der Compiler den Funktionskörper in eine Funktion, die die folgende Struktur besitzt:

{
Promise promise;
co_await promise.initial_suspend();
try
{
<function body>
}
catch (...)
{
promise.unhandled_exception();
}
FinalSuspend:
co_await promise.final_suspend();
}

<function body> steht für den ursprünglichen Funktionskörper. Der vereinfachte Workflow besteht aus den folgenden Phasen:

Die Coroutine startet ihre Ausführung:

  • Der Coroutinen-Frame wird allokiert.
  • Alle Funktionsparameter werden in den Coroutinen-Frame kopiert,
  • Das Promise-Objekt promise wird erzeugt,
  • Der Aufruf promise.get_return_object() erzeugt den Coroutinen-Handle, der in einer lokalen Variable gespeichert wird. Das Ergebnis des Aufrufs wird an den Aufrufer zurückgegeben, falls die Coroutine angehalten wird.
  • promise.initial_suspend() wird mithilfe von co_await ausgeführt. Das Promise-Objekt gibt std::suspend_never für eine Coroutine zurück, die sofort ausgeführt wird und std::suspend_always für eine Coroutine, die erst auf Bedarf (lazy) ausgeführt wird.
  • Der Körper der Coroutine wird dann ausgeführt, wenn co_await promise.inital_suspend() fortgesetzt wird.

Die Coroutine erreicht einen Anhaltepunkt:

  • Der Coroutinen-Handle (promise.get_return_object()) wird an den Aufrufer zurückgegeben, der die Couroutine fortsetzt.

Die Coroutine erreicht co_return. Sie

  • ruft promise.return_void() für co_return oder co_return expression auf, falls expression den Datentyp void besitzt.
  • ruft promise.return_value(expression) für co_return expression auf, falls expression nicht den Datentyp void besitzt.
  • zerstört alle auf dem Stack erzeugten Variablen.
  • ruft promise.final_suspend() mithilfe von co_await auf.

Die Coroutine wird zerstört (co_return, eine nicht gefangene Ausnahme oder durch den Coroutinen-Handle). Sie

  • ruft den Destruktor des Promise-Objekts auf.
  • ruft die Destruktoren der Funktionsparameter auf.
  • gibt den Speicher der Coroutine wieder frei.
  • gibt die Ausführung an den Aufrufenden zurück.

Jetzt gilt es, die Theorie in die Praxis umzusetzen.

Ein unendlicher Datenstrom mit co_yield

Das folgende Programm erzeugt einen unendlichen Datenstrom. Die Coroutine getNext verwendet co_yield dafür. getNext erzeugt einen Datenstrom, der mit start beginnt und auf Anfrage jeweils den um step inkrementierten nächsten Wert zurückgibt:

// infiniteDataStream.cpp

#include <coroutine>
#include <memory>
#include <iostream>

template<typename T>
struct Generator {

struct promise_type;
using handle_type = std::coroutine_handle<promise_type>;

Generator(handle_type h): coro(h) {} // (3)
handle_type coro;

~Generator() {
if ( coro ) coro.destroy();
}
Generator(const Generator&) = delete;
Generator& operator = (const Generator&) = delete;
Generator(Generator&& oth) noexcept : coro(oth.coro) {
oth.coro = nullptr;
}
Generator& operator = (Generator&& oth) noexcept {
coro = oth.coro;
oth.coro = nullptr;
return *this;
}
T getValue() {
return coro.promise().current_value;
}
bool next() { // (5)
coro.resume();
return not coro.done();
}
struct promise_type {
promise_type() = default; // (1)

~promise_type() = default;

auto initial_suspend() { // (4)
return std::suspend_always{};
}
auto final_suspend() {
return std::suspend_always{};
}
auto get_return_object() { // (2)
return Generator{handle_type::from_promise(*this)};
}
auto return_void() {
return std::suspend_never{};
}

auto yield_value(const T value) { // (6)
current_value = value;
return std::suspend_always{};
}
void unhandled_exception() {
std::exit(1);
}
T current_value;
};

};

Generator<int> getNext(int start = 0, int step = 1){
auto value = start;
for (int i = 0;; ++i){
co_yield value;
value += step;
}
}

int main() {

std::cout << std::endl;

std::cout << "getNext():";
auto gen = getNext();
for (int i = 0; i <= 10; ++i) {
gen.next();
std::cout << " " << gen.getValue(); // (7)
}

std::cout << "\n\n";

std::cout << "getNext(100, -10):";
auto gen2 = getNext(100, -10);
for (int i = 0; i <= 20; ++i) {
gen2.next();
std::cout << " " << gen2.getValue();
}

std::cout << std::endl;

}

Die main-Funktion erzeugt zwei Coroutinen. Die erste Coroutine gen gibt die Werte von 0 bis 10 zurück und die zweite Coroutine gen2 von 100 bis -100. Bevor ich den Workflow genauer vorstelle, möchte ich mithilfe des Compiler Explorer und des GCC 10 die Ausgabe des Programms präsentieren.

Die Zahlen im Programm infiniteDataStream.cpp stehen für die Schritte in der ersten Iteration des Workflows.

  1. Erzeugt den Promise.
  2. Ruft promise.get_return_object() auf und speichert das Ergebnis des Aufrufs in einer lokalen Variable.
  3. Erzeugt den Generator.
  4. Ruft promise.inital_suspend() auf. Der Generator ist lazy und pausiert damit automatisch.
  5. Frägt nach dem nächsten Wert und gibt zurück, ob die Werte des Generators bereits konsumiert sind.
  6. Wird durch co_yield angestoßen. Danach ist der nächste Wert verfügbar.
  7. Fordert den nächsten Wert an.

In weiteren Iterationen werden nur die Schritte 5 bis 7 wiederholt.

Es ist anspruchsvoll, das den Coroutinen zugrunde liegende Framework zu verstehen. Der einfachste Zugang ist es meines Erachtens, mit existieren Coroutinen zu experimentieren und ihr Verhalten zu beobachten. Die vorgestellte Coroutine, die einen unendlichen Datenstrom erzeugt, ist ein guter Startpunkt für deine Experimente: Verwende dazu den Link auf das ausführbare Programm im Compiler Explorer.

Wie geht's weiter?

Im heutigen Artikel habe ich dank co_yield einen Generator für einen unendlichen Datenstrom vorgestellt. Mein nächster Artikel beschäftigt sich mit der Thread-Synchronisation und co_await.