Многопоточность в C++. Условные переменные (condition variables)

Добавлено 31 декабря 2021 в 15:00

Представьте, что вы едете в ночном поезде. Чтобы гарантированно сойти на нужной станции, придется не спать всю ночь и внимательно отслеживать все остановки. Свою станцию вы не пропустите, но сойдете с поезда уставшим. Но есть и другой способ: заглянуть в расписание, увидеть предполагаемое время прибытия поезда на нужную станцию, поставить будильник на нужное время с небольшим запасом и лечь спать. Этого будет вполне достаточно, и вы не пропустите свою станцию, но, если поезд задержится, пробуждение окажется слишком ранним. Идеальным решением было бы лечь спать, попросив проводника разбудить вас на нужной станции.

Какое отношение всё это имеет к потокам? Если какой-то поток ожидает, пока другой поток завершит выполнение своей задачи, есть несколько вариантов развития событий. Во-первых, первый поток может постоянно проверять состояние флага в совместно используемых данных, защищенных мьютексом, а второй поток будет обязан установить флаг по завершении своей задачи. Это весьма накладно по двум соображениям: постоянно проверяя состояние флага, поток впустую тратит ценное процессорное время, а когда мьютекс заблокирован ожидающим потоком, его нельзя заблокировать никаким другим потоком. Второй вариант предполагает введение ожидающего потока в спящий режим на короткий промежуток времени между проверками с помощью функции std::this_thread::sleep_for(). Это уже гораздо лучше, поскольку поток, находясь в спящем режиме, не тратит процессорное время впустую, но хороший период пребывания в нем подобрать довольно трудно. Слишком короткий период спячки между проверками – и поток по-прежнему тратит впустую время процессора на слишком частые проверки, слишком длинный период спячки – и поток выйдет из нее позже положенного, что приведет к ненужной задержке. Позднее пробуждение напрямую влияет на работу программы довольно редко, но в приложении реального времени это может быть критичным. Третьим и наиболее предпочтительным вариантом является использование средств из стандартной библиотеки C++, предназначенных для ожидания наступления какого-либо события. Основным механизмом для реализации такого ожидания является условная переменная. Концептуально она связана с каким-либо условием, и один или несколько потоков могут ожидать выполнения этого условия. Когда другой поток обнаружит, что условие выполнено, он может известить об этом один или несколько потоков, ожидающих условную переменную, чтобы разбудить их и позволить продолжить работу.

Стандартная библиотека C++ предоставляет не одну, а две реализации условной переменной: std::condition_variable и std::condition_variable_any. Обе они объявлены в заголовке <condition_variable>. В обоих случаях для соответствующей синхронизации им нужно работать с мьютексом: первая реализация ограничивается работой только с std::mutex, а вторая может работать с любыми типами, которые работают как мьютекс, о чем свидетельствует суффикс _any. Если не требуется дополнительная гибкость, предпочтение следует отдавать реализации std::condition_variable.

std::condition_variable

Класс condition_variable – это примитив синхронизации, который может использоваться для блокировки потока или нескольких потоков до тех пор, пока другой поток не изменит общую переменную (не выполнит условие) и не уведомит об этом condition_variable.

Поток, который намеревается изменить общую переменную, должен:

  • захватить std::mutex (обычно через std::lock_guard);
  • выполнить модификацию, пока удерживается блокировка мьютекса;
  • выполнить notify_one или notify_all на std::condition_variable (блокировка не должна удерживаться для уведомления).

Даже если общая переменная является атомарной, всё равно требуется использовать мьютекс для корректного оповещения ожидающих потоков.

Любой поток, который ожидает наступления события от std::condition_variable, должен:

  • с помощью std::unique_lock<std::mutex> получить блокировку того же мьютекса, который используется для защиты общей переменной;
  • проверить, что необходимое условие ещё не выпонлено;
  • вызвать метод wait, wait_for или wait_until. Операции ожидания освобождают мьютекс и приостанавливают выполнение потока;
  • когда получено уведомление, истёк тайм-аут или произошло ложное пробуждение, поток пробуждается, и мьютекс повторно блокируется. Затем поток должен проверить, что условие, действительно, выполнено, и возобновить ожидание, если пробуждение было ложным.

Вместо трёх последних шагов можно воспользоваться перегрузкой методов wait, wait_for и wait_until, которая принимает предикат для проверки условия и выполняет три последних шага.

std::condition_variable работает только с std::unique_lock<std::mutex>; это ограничение обеспечивает максимальную эффективность на некоторых платформах. std::condition_variable_any работает с любым BasicLockable объектом, например, с std::shared_lock.

Condition variables допускают одновременный вызов методов wait, wait_for, wait_until, notify_one и notify_all из разных потоков.

Пример использования:

#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
 
std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;
 
void worker_thread()
{
    // ждем, пока main() не отправит данные
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk, []{return ready;});
 
    // после ожидания мы владеем блокировкой
    std::cout << "Worker thread is processing data\n";
    data += " after processing";
 
    // отправляем данные обратно в main()
    processed = true;
    std::cout << "Worker thread signals data processing completed\n";
 
    // Ручная разблокировка выполняется перед уведомлением,
    // чтобы не разбудить ожидающий поток только для повторной блокировки
    // (подробности смотрите в разделе о notify_one)
    lk.unlock();
    cv.notify_one();
}
 
int main()
{
    std::thread worker(worker_thread);
 
    data = "Example data";
    // отправляем данные в обрабатывающий поток
    {
        std::lock_guard<std::mutex> lk(m);
        ready = true;
        std::cout << "main() signals data ready for processing\n";
    }
    cv.notify_one();
 
    // ждем обработчика
    {
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return processed;});
    }
    std::cout << "Back in main(), data = " << data << '\n';
 
    worker.join();
}
/*
Возможный вывод:
main() signals data ready for processing
Worker thread is processing data
Worker thread signals data processing completed
Back in main(), data = Example data after processing
*/

std::condition_variable_any

Этот тип условной переменной имеет такой же интерфейс, как std::condition_variable, но может использоваться не только с std::unique_lock<std::mutex>, а с любым блокируемым типом. Работает медленее, чем std::condition_variable. Может использоваться, например, для работы с std::shared_lock.

std::notify_all_at_thread_exit

Стандартная библиотека предоставляет ещё одну функцию для использования в ситуациях, когда мы с помощью condition_variable хотим дождаться завершения потока.

Зачем это нужно?

Допустим, мы хотим дождаться завершения detached потока, в этом случае мы не можем использовать метод join для ожидания завершения потока. Тогда мы решаем, что нужно использовать condition_variable для уведомления о том, что поток завершается. Но если мы просто в конец функции, выполняемой в отдельном потоке, добавим cv.notify_all();, то получим поведение, отличное от того, которое нам нужно. Несмотря на то, что эта команда будет последней в функции потока, поток на ней ещё не заканчивает выполнение. После вызова notify_all в этом же потоке будет происходить уничтожение thread_local переменных, будут вызываться их деструкторы и выполняться какие-либо действия. То есть, на самом деле, уведомление было отправлено ещё до того, как поток завершился.

Тогда как на самом деле дождаться полного завершения detached потока? Для этого стандартная библиотека предоставляет функцию std::notify_all_at_thread_exit. Она дожидается завершения потока, в том числе уничтожения thread_local переменных, и последними действиями в потоке выполняет:

lk.unlock();
cond.notify_all();

Эквивалентный эффект может быть достигнут с помощью средств, предоставляемых std::promise или std::packaged_task.

Пример использования:

#include <mutex>
#include <thread>
#include <condition_variable>
 
#include <cassert>
#include <string>
 
std::mutex m;
std::condition_variable cv;
 
bool ready = false;
std::string result; // какой-то произвольный тип
 
void thread_func()
{
    thread_local std::string thread_local_data = "42";
 
    std::unique_lock<std::mutex> lk(m);
 
    // присваиваем значение переменной result, используя thread_local данные
    result = thread_local_data;
    ready = true;
 
    std::notify_all_at_thread_exit(cv, std::move(lk));
 
}   // 1. уничтожаются thread_local переменные;
    // 2. разблокируется мьютекс;
    // 3. уведомляется cv.
 
int main()
{
    std::thread t(thread_func);
    t.detach();
 
    // выполняем другую работу
    // ...
 
    // ждем открепленного потока
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk, []{ return ready; });
 
    // результат готов, деструкторы thread_local переменных завершены,
    // нет неопределенного поведения
    assert(result == "42");
}

Теги

C++ / CppMutex / Мьютексstd::condition_variablestd::condition_variable_anystd::mutexstd::notify_all_at_thread_exitSTL / Standard Template Library / Стандартная библиотека шаблоновМногопоточностьУсловная переменная / Condition variable

На сайте работает сервис комментирования DISQUS, который позволяет вам оставлять комментарии на множестве сайтов, имея лишь один аккаунт на Disqus.com.

В случае комментирования в качестве гостя (без регистрации на disqus.com) для публикации комментария требуется время на премодерацию.