C++20: Mit Coroutinen einen Future in einem eigenen Thread ausführen

Modernes C++ Rainer Grimm  –  215 Kommentare

Mit diesem Artikel beende ich meine Ausführungen zu co_return in C++20 ab. Der erste Artikel beschäftigte sich mit einem Future, der sofort ausgeführt wurde. Es folgte ein Future, der nur bei Bedarf startete. Heute werde ich das Future dank Coroutinen in seinem eigenen Thread ausführen.

Bevor ich diesen Artikel beginne, möchte ich meine Motivation zu dieser Miniserie über Coroutinen in C++20 nochmals zum Ausdruck bringen: Mir geht es darum, durch einfache Beispiele einen ersten Zugang zu den komplizierten Arbeitsabläufen von Coroutinen anzubieten. Diese aufeinander aufbauenden Artikel habe ich bereits in dieser Miniserie geschrieben:

Jetzt geht es darum, das Future in einem separaten Thread auszuführen.

Ausführung in einem separaten Thread

Die Coroutine im ersten Artikel pausierte, bevor ihr Funktionskörper ausgeführt wurde:

MyFuture<int> createFuture() {
std::cout << "createFuture" << '\n';
co_return 2021;
}

Der Grund war, dass die Funktion initial_suspend den Promise std::suspend_always zurückgab. Das heißt, dass die Coroutine zuerst pausiert und dadurch auf einem separaten Thread ausgeführt werden kann:

// lazyFutureOnOtherThread.cpp

#include <coroutine>
#include <iostream>
#include <memory>
#include <thread>

template<typename T>
struct MyFuture {
struct promise_type;
using handle_type = std::coroutine_handle<promise_type>;
handle_type coro;

MyFuture(handle_type h): coro(h) {}
~MyFuture() {
if ( coro ) coro.destroy();
}

T get() { // (1)
std::cout << " MyFuture::get: "
<< "std::this_thread::get_id(): "
<< std::this_thread::get_id() << '\n';

std::thread t([this] { coro.resume(); }); // (2)
t.join();
return coro.promise().result;
}

struct promise_type {
promise_type(){
std::cout << " promise_type::promise_type: "
<< "std::this_thread::get_id(): "
<< std::this_thread::get_id() << '\n';
}
~promise_type(){
std::cout << " promise_type::~promise_type: "
<< "std::this_thread::get_id(): "
<< std::this_thread::get_id() << '\n';
}

T result;
auto get_return_object() {
return MyFuture{handle_type::from_promise(*this)};
}
void return_value(T v) {
std::cout << " promise_type::return_value: "
<< "std::this_thread::get_id(): "
<< std::this_thread::get_id() << '\n';
std::cout << v << std::endl;
result = v;
}
std::suspend_always initial_suspend() {
return {};
}
std::suspend_always final_suspend() noexcept {
std::cout << " promise_type::final_suspend: "
<< "std::this_thread::get_id(): "
<< std::this_thread::get_id() << '\n';
return {};
}
void unhandled_exception() {
std::exit(1);
}
};
};

MyFuture<int> createFuture() {
co_return 2021;
}

int main() {

std::cout << '\n';

std::cout << "main: "
<< "std::this_thread::get_id(): "
<< std::this_thread::get_id() << '\n';

auto fut = createFuture();
auto res = fut.get();
std::cout << "res: " << res << '\n';

std::cout << '\n';

}

Ich habe das Programm ein wenig kommentiert, sodass vor allem die ID des ausgeführten Threads dargestellt wird. Das Programm lazyFutureOnOtherThread.cpp ist dem vorherigen Programm lazyFuture.cpp des zweiten Artikels sehr ähnlich. Den größten Unterschied stellt die Funktion get (Zeile 1) dar. Der Aufruf std::thread t([this] { coro.resume(); }) (Zeile 2) führt die Coroutine in einem anderen Thread fort.

Auf dem Wandbox Online-Compiler lässt sich das Programm direkt ausprobieren:

Ich möchte gerne noch ein paar Bemerkungen zur Funktion get hinzufügen. Es ist wichtig, dass der Promise, der in einem anderen Thread ausgeführt wird, fertig ist, bevor er sein Ergebnis mittels coro.promise().result; zurückgibt:

T get() {
std::thread t([this] { coro.resume(); });
t.join();
return coro.promise().result;
}

Wird der Thread gejoinet, nachdem coro.promise().result aufgerufen wurde, ist dies undefiniertes Verhalten. In der folgenden Implementierung verwende ich std::jthread. Hierzu habe ich bereits einen Artikel geschrieben: "Ein verbesserter Thread mit std::jthread". Da std::jthread automatisch joint, falls er seinen Gültigkeitsbereich verlässt, ist dies zu spät:

T get() {   
std::jthread t([this] { coro.resume(); });
return coro.promise().result;
}

In diesem Fall ist es sehr wahrscheinlich, dass der Client das Ergebnis erhält, bevor der Promise seinen Aufruf return_value abgeschlossen hat. Nun besitzt value und damit res einen zufälligen Wert.

Natürlich gibt es mehr Möglichkeiten sicherzustellen, dass der Thread vor dem return-Aufruf fertig ist.

  • std::jthread erhält einen eigenen Gültigkeitsbereich:
T get() {
{
std::jthread t([this] { coro.resume(); });
}
return coro.promise().result;
}
  • std::jthread wird ein temporäres Objekt:
T get() {
std::jthread([this] { coro.resume(); });
return coro.promise().result;
}

Insbesondere die letzte Lösung finde ich problematisch, denn man benötigt wohl ein paar Sekunden, um zu erkennen, dass durch das temporäre Objekt der Konstruktor von std::jthread implizit aufgerufen wird.

promise_type

Mancher mag sich wundern, dass eine Coroutine wie MyFuture immer einen inneren Typ promise_type besitzt. Dieser Name ist notwendig. Entsprechend lässt sich auch std::coroutines_traits für MyFuture spezialisieren und in ihm einen öffentlichen promise_type erzeugen. Ich erwähne diesen Punkt, da ich weiß, dass viele Entwickler – auch ich – bereits in diese Falle getappt sind.

Hier nun eine weitere Falle.

return_void and return_value

Der Promise benötigt entweder die Funktion return_void oder return_value. Der erste Fall tritt ein, falls

  • die Coroutine keine co_return-Anweisung besitzt.
  • die Coroutine eine co_return-Anweisung ohne Argument besitzt.
  • die Coroutine eine co_return expression-Anweisung besitzt, in der expression den Typ void hat.

Der Promise braucht die Funktion return_value, falls die Coroutine eine co_return expression-Anweisung besitzt, in der expression nicht den Typ void hat.

Wird das Ende einer Coroutine erreicht, die void zurückgibt, aber keine return_void-Funktion besitzt, ist das undefiniertes Verhalten. Interessanterweise verlangt der Microsoft-Compiler, aber nicht der GCC eine Funktion return_void, wenn die Coroutine immer in ihrem letzten Haltepunkt pausiert und damit nie ihr Ende erreicht: std::suspend_always final_suspend() noexcept. Aus meiner Sicht ist der C++20-Standard in diesem Aspekt nicht eindeutig, und ich füge meinem Promise sicherheitshalber immer eine Funktion void return_void() {} hinzu.

Wie geht's weiter?

Nach meinen Artikeln zu co_return geht es mit co_yield weiter. Dieses erlaubt es, unendliche Datenströme zu erzeugen. In meinem nächsten Artikel gehe ich darauf genauer ein.