C++20: Thread-Pools mit cppcoro

Modernes C++  –  0 Kommentare

Der dritte und letzte Artikel der Miniserie zu Lewis Bakers Coroutinen-Abstraktion cppcoro führt Thread-Pools ein.

Der heutige Artikel baut auf meinen zwei vorherigen Artikeln zu cppcoro auf:

Zusätzlich zuer cppcoro::sync_wait-Funktion, mit der sich einfach auf die vollständige Ausführung eines Awaitable warten lässt, bietet cppcoro die interessante cppcoro::when_all-Funktion an.

when_all

when_all: erzeugt ein Awaitable, das auf alle Input-Awaitable wartet und das Aggregat der einzelnen Ergebnisse zurückgibt.

Ich habe die Definition cppcoro::when_all ein wenig vereinfacht. Das folgende Beispiel soll einen ersten Eindruck geben:

// cppcoroWhenAll.cpp

#include <chrono>
#include <iostream>
#include <thread>

#include <cppcoro/sync_wait.hpp>
#include <cppcoro/task.hpp>
#include <cppcoro/when_all.hpp>

using namespace std::chrono_literals;

cppcoro::task<std::string> getFirst() {
std::this_thread::sleep_for(1s); // (3)
co_return "First";
}

cppcoro::task<std::string> getSecond() {
std::this_thread::sleep_for(1s); // (3)
co_return "Second";
}

cppcoro::task<std::string> getThird() {
std::this_thread::sleep_for(1s); // (3)
co_return "Third";
}


cppcoro::task<> runAll() {
// (2)
auto[fir, sec, thi] = co_await cppcoro::when_all(getFirst(), getSecond(), getThird());

std::cout << fir << " " << sec << " " << thi << std::endl;

}

int main() {

std::cout << std::endl;

auto start = std::chrono::steady_clock::now();

cppcoro::sync_wait(runAll()); // (1)

std::cout << std::endl;

auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end - start; // (4)
std::cout << "Execution time " << elapsed.count() << " seconds." << std::endl;

std::cout << std::endl;

}

Der Top-Level Task cppcoro::sync_wait(runAll()) (Zeite 1) wartet auf das Awaitable runAll. Dieses wiederum wartet auf die Awaitables getFirst, getSecond und getThird (Zeile 2). Die Awaitables runAll, getFirst, getSecond und getThird sind Coroutinen. Jede der get-Funktionen schläft für eine Sekunde (Zeite 3). Drei mal eins macht drei. Diese drei Sekunden sind genau die Zeit, die der Aufruf cppcoro::sync_wait(runAll()) auf alle Coroutinen wartet. Zeile 4 gibt die Zeitdauer aus:

Das waren die Grundlagen zur Funktion cppcoro::when_all. Jetzt erweitere ich das Beispiel um einen Thread-Pool.

static_thread_pool

static_thread_pool verwaltet (schedule) Arbeitspakete auf einem Thread-Pool fester Länge. cppcoro::static_thread_pool kann mit und ohne Angabe einer Anzahl aufgerufen werden. Die Anzahl steht für die Anzahl der Threads, die erzeugt werden. Falls du die Anzahl nicht angibst, kommt die Funktion std::thread::hardware_concurrency() zum Einsatz. Sie gibt einen Hinweis darauf, wie viele Hardware-Threads auf dem System unterstützt werden. Dies ist meist die Anzahl der Prozessoren oder Kerne, die zur Verfügung stehen.

Das folgende Programm basiert auf dem vorherigen Programm, lediglich die Coroutinen getFirst, getSecond und getThird werden gleichzeitig ausgeführt:

// cppcoroWhenAllOnThreadPool.cpp

#include <chrono>
#include <iostream>
#include <thread>

#include <cppcoro/sync_wait.hpp>
#include <cppcoro/task.hpp>
#include <cppcoro/static_thread_pool.hpp>
#include <cppcoro/when_all.hpp>


using namespace std::chrono_literals;

cppcoro::task<std::string> getFirst() {
std::this_thread::sleep_for(1s);
co_return "First";
}

cppcoro::task<std::string> getSecond() {
std::this_thread::sleep_for(1s);
co_return "Second";
}

cppcoro::task<std::string> getThird() {
std::this_thread::sleep_for(1s);
co_return "Third";
}

template <typename Func>
cppcoro::task<std::string> runOnThreadPool(cppcoro::static_thread_pool& tp, Func func) {
co_await tp.schedule();
auto res = co_await func();
co_return res;
}

cppcoro::task<> runAll(cppcoro::static_thread_pool& tp) {

auto[fir, sec, thi] = co_await cppcoro::when_all( // (3)
runOnThreadPool(tp, getFirst),
runOnThreadPool(tp, getSecond),
runOnThreadPool(tp, getThird));

std::cout << fir << " " << sec << " " << thi << std::endl;

}

int main() {

std::cout << std::endl;

auto start = std::chrono::steady_clock::now();

cppcoro::static_thread_pool tp; // (1)
cppcoro::sync_wait(runAll(tp)); // (2)

std::cout << std::endl;

auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end - start; // (4)
std::cout << "Execution time " << elapsed.count() << " seconds." << std::endl;

std::cout << std::endl;

}

Dieses sind die entscheidenden Unterschiede zum vorherigen Programm cppcoroWhenAll.cpp. Ich erzeuge in Zeile (1) einen Thread-Pool tp und verwende diesen als Argument der Funktion runAll(tp) (Zeile 2). Die Funktionen runAll verwendet den Thread-Pool, um die Coroutinen gleichzeitig zu starten. Dank Structured Binding (Zeile 3) lassen sich die Werte aller Coroutinen einfach einsammeln und Variablen zuweisen. Das Ausführen des Programms benötigt jetzt eine statt bisher drei Sekunden.

Du weißt es eventuell bereits, dass wir mit C++20 Latches und Barriers erhalten. Beise sind einfache Synchronisationsmechanismen, die es erlauben, Threads zu blockieren, bis ein Zähler den Wert null besitzt. cppcoro bietet auch Latches und Barriers an.

async_latch

async_latch erlaubt es, Coroutinen asynchron zu warten, bis ein Zähler den Wert null besitzt. Das folgende Programm cppcoroLatch.cpp stellt Thread-Synchronisation mit cppcoro::async_latch vor:

// cppcoroLatch.cpp

#include <chrono>
#include <iostream>
#include <future>

#include <cppcoro/sync_wait.hpp>
#include <cppcoro/async_latch.hpp>
#include <cppcoro/task.hpp>

using namespace std::chrono_literals;

cppcoro::task<> waitFor(cppcoro::async_latch& latch) {
std::cout << "Before co_await" << std::endl;
co_await latch; // (3)
std::cout << "After co_await" << std::endl;
}

int main() {

std::cout << std::endl;

cppcoro::async_latch latch(3); // (1)

// (2)
auto waiter = std::async([&latch]{ cppcoro::sync_wait(waitFor(latch)); });

auto counter1 = std::async([&latch] { // (2)
std::this_thread::sleep_for(2s);
std::cout << "counter1: latch.count_down() " << std::endl;
latch.count_down();
});

auto counter2 = std::async([&latch] { // (2)
std::this_thread::sleep_for(1s);
std::cout << "counter2: latch.count_down(2) " << std::endl;
latch.count_down(2);
});

waiter.get(), counter1.get(), counter2.get();

std::cout << std::endl;

}

In Zeile (1) habe ich einen cppcoro::asynch_latch erzeugt und seinen Zähler mit drei initialisiert. Dieses Mal verwende ich std::async (Zeile 2), um die drei Coroutinen gleichzeitig auszuführen. Jeder std::async-Aufruf erhält den Latch per Referenz. Die waitFor-Coroutine warten in Zeile 3, bis der Zähler den Wert null besitzt. Die Coroutine counter1 schläft für zwei Sekunden, bevor sie den Zähler um zwei reduziert. Im Gegensatz dazu schläft counter2 eine Sekunde und reduziert den Zähler um zwei. Der Screenshot zeigt das wechselnde Ausführen der Threads:

Wie geht's weiter?

Bisher habe ich über drei der vier großen Neuerungen in C++20 geschrieben: Concepts, Ranges und Coroutinen. Module fehlen noch in meiner Tour durch die großen Vier in C++20 und werden daher Thema der nächsten Artikel sein.

Eine kleine Anmerkung möchte ich noch loswerden. Wenn du gerne einen Artikel zu einem C++20-Feature schreiben möchtest, das ich noch vorstellen werde, schreibe mir eine E-Mail. Ich freue mich auf den Artikel und werde ihn gegebenenfalls in Englisch/Deutsch übersetzen, falls dies notwendig ist.