Многопоточность в C++. Защёлки и барьеры (latches and barriers)
В C++20 в стандартной библиотеке появились барьеры.
Защелки (latches) и барьеры (barriers) – это механизм синхронизации потоков, который позволяет блокировать любое количество потоков до тех пор, пока ожидаемое количество потоков не достигнет барьера. Защелки нельзя использовать повторно, барьеры можно использовать повторно.
Эти механизмы синхронизации используются, когда выполнение параллельного алгоритма можно разделить на несколько этапов, разделённых барьерами. В частности, с помощью барьера можно организовать точку сбора частичных результатов вычислений, в которой подводится итог этапа вычислений. Например, если стоит задача отфильтровать изображение с помощью двух разных фильтров, и разные потоки фильтруют разные части изображения, то перед началом второй фильтрации следует дождаться, когда первая фильтрация будет полностью завершена, то есть все потоки должны дойти до барьера между двумя этапами фильтрации.
Барьер для группы потоков в исходном коде означает, что каждый поток должен остановиться в этой точке и подождать достижения барьера другими потоками группы. Когда все потоки достигли барьера, их выполнение продолжается.
std::latch
std::latch
– это уменьшающийся счетчик. Значение счетчика инициализируется при создании. Потоки уменьшают значение счётчика и блокируются на защёлке до тех пор, пока счетчик не уменьшится до нуля. Нет возможности увеличить или сбросить счетчик, что делает защелку одноразовым барьером.
В отличие от std::barrier
, std::latch
может быть уменьшен одним потоком более одного раза.
Использовать защёлки очень просто. В нашем распоряжении несколько методов:
count_down(value)
уменьшает значение счётчика наvalue
(по умолчанию 1) без блокировки потока. Если значение счётчика становится отрицательным, то поведение не определено.try_wait()
позволяет проверить, не достигло ли значение счётчика нуля. С низкой вероятностью может ложно возвращатьfalse
.wait()
блокирует текущий поток до тех пор, пока счётчик не достигнет нулевого значения. Если значение счётчика уже равно 0, то управление возвращается немедленно.arrive_and_wait(value)
уменьшает значение счётчика наvalue
(по умолчанию 1) и блокирует текущий поток до тех пор, пока счётчик не достигнет нулевого значения. Если значение счётчика становится отрицательным, то поведение не определено.
Пример:
void DoWork(threadpool* pool) {
latch completion_latch(NTASKS);
for (int i = 0; i < NTASKS; ++i) {
pool->add_task([&] {
// выполняем работу
...
completion_latch.count_down();
}));
}
// блокируем, пока работа не будет выполнена
completion_latch.wait();
}
std::barrier
Используется почти так же, как std::latch
, но является многоразовым: как только ожидающие потоки разблокируются, значение счётчика устанавливается в начальное, и тот же самый барьер может быть использован повторно.
Работу барьера можно разбить на фазы. Фаза заканчивается, когда счётчик барьера обнуляется, затем начинается новая фаза. Фазы работы имеют идентификаторы, которые возвращаются некоторыми методами. Это нужно для того, чтобы мы не ждали конца фазы, которая уже завершена.
Итак, как пользоваться барьерами? В нашем распоряжении следующие методы:
arrive(value)
уменьшает текущее значение счётчика наvalue
(по умолчанию 1). Поведение не определено, если значение счётчика станет отрицательным. Метод возвращает идентификатор фазы, который имеет типarrival_token
.wait(arrival_token)
блокирует текущий поток до тех пор, пока указанная фаза не завершится. Принимает идентификатор фазы в качестве параметра.arrive_and_wait()
уменьшает текущее значение счётчика на 1 и блокирует текущий поток до тех пор, пока счётчик не обнулится. Эквивалентно вызовуwait(arrive());
. Поведение не определено, если вызов происходит, когда значение счётчика равно нулю. Поэтому количество потоков, уменьшающих счётчик барьера, не должно быть больше начального значения счётчика.arrive_and_drop()
уменьшает на 1 начальное значение счётчика для следующих фаз, а так же текущее значение счётчика. Поведение не определено, если вызов происходит, когда значение счётчика равно нулю.
Пример:
int n_threads;
std::vector<thread*> workers;
std::barrier task_barrier(n_threads);
for (int i = 0; i < n_threads; ++i) {
workers.push_back(new thread([&] {
for(int step_no = 0; step_no < 5; ++step_no) {
// выполнение фазы
...
task_barrier.arrive_and_wait();
}
});
}