Daily bit(e) C++. std::condition_variable, std::condition_variable_any
Daily bit(e) of C++ 20. Инструменты для безопасного ожидания общего состояния: std::condition_variable и std::condition_variable_any.

std::condition_variable и std::condition_variable_any позволяют нескольким потокам безопасно ожидать общего состояния.
Условные переменные принимают полученную блокировку и условие и позволяют потоку продолжить работу только тогда, когда и блокировка получена, и условие истинно.
std::condition_variable поддерживает только std::unique_lock, std::condition_variable_any поддерживает std::stop_token и будет работать с любой блокировкой, предоставляющей методы lock() и unlock().
Поток, желающий изменить общее состояние, должен получить блокировку, изменить состояние, а затем вызвать либо notify_one() (чтобы разбудить один поток, ожидающий эту условную переменную), либо notify_all() (чтобы разбудить все потоки).
#include <thread>
#include <mutex>
#include <condition_variable>
#include <stop_token>
struct Resource
{
// общее состояние
bool full = false;
std::mutex mux;
std::condition_variable_any cond;
void produce(std::stop_token token)
{
{
std::unique_lock lock(mux);
// ждать, пока условие не будет равно true
// 1. блокировка освобождена
// 2. если поток проснулся, блокировка запрашивается снова
// 3. если была запрошена остановка,
// вызов wait() завершается, возвращая false
// 4. если остановка не была запрошена, проверяется условие
// 5. если условие всё ещё не равно true,
// блокировка освобождается, и мы переходим к шагу 2.
// 6. если условие равно true,
// вызов wait() завершается, возвращая true
if (cond.wait(lock, token, [this]{ return !full; }))
{
// ресурс пуст
}
else
{
// была запрошена остановка
return;
}
// При ожидании выхода блокировка удерживается
full = true; // безопасно
}
// Разбудить один потока, ожидающий эту условную переменную.
// Обратите внимание, что мы уже освободили нашу блокировку, иначе
// оповещенный поток проснется, но не сможет получить блокировку
// и снова приостановит свою работу.
cond.notify_one();
}
void consume(std::stop_token token)
{
{
std::unique_lock lock(mux);
// то же самое, что и выше, но с противоположной семантикой
if (cond.wait(lock, token, [this]{ return full; }))
{
// ресурс полон
}
else
{
// была запрошена остановка
return;
}
full = false;
}
cond.notify_one();
}
};
Пример на Compiler Explorer
#include <thread>
#include <mutex>
#include <condition_variable>
#include <stop_token>
#include <print>
using namespace std::chrono_literals;
struct Resource
{
// общее состояние
bool full = false;
std::mutex mux;
std::condition_variable_any cond;
void produce(std::stop_token token)
{
{
std::unique_lock lock(mux);
// ждать, пока условие не будет равно true
// 1. блокировка освобождена
// 2. если поток проснулся, блокировка запрашивается снова
// 3. если была запрошена остановка,
// вызов wait() завершается, возвращая false
// 4. если остановка не была запрошена, проверяется условие
// 5. если условие всё ещё не равно true,
// блокировка освобождается, и мы переходим к шагу 2.
// 6. если условие равно true,
// вызов wait() завершается, возвращая true
if (cond.wait(lock, token, [this]{ return !full; }))
{
std::println("Producer running.");
}
else
{
std::println("Producer stopping.");
return;
}
// При ожидании выхода блокировка удерживается
std::println("Filling the resource and notifying the consumer.");
full = true; // безопасно
std::this_thread::sleep_for(200ms);
}
// Разбудить один потока, ожидающий эту условную переменную.
// Обратите внимание, что мы уже освободили нашу блокировку, иначе
// оповещенный поток проснется, но не сможет получить блокировку
// и снова приостановит свою работу.
cond.notify_one();
}
void consume(std::stop_token token)
{
{
std::unique_lock lock(mux);
// то же самое, что и выше, но с противоположной семантикой
if (cond.wait(lock, token, [this]{ return full; }))
{
std::println("Consumer running.");
}
else
{
std::println("Consumer stopping.");
return;
}
std::println("Consuming the resource and notifying the producer.");
full = false;
std::this_thread::sleep_for(200ms);
}
cond.notify_one();
}
};
int main()
{
Resource resource;
// Запуск потребителя и производителя
auto t1 = std::jthread([&resource](std::stop_token token){
while (!token.stop_requested())
resource.produce(token);
});
auto t2 = std::jthread([&resource](std::stop_token token){
while (!token.stop_requested())
resource.consume(token);
});
std::this_thread::sleep_for(2s);
// Запрашиваем у обоих остановку
t1.request_stop();
t2.request_stop();
}
