Многопоточность в C++. Возврат значений и проброс исключений
Предположим, что имеются какие-то длительные вычисления, которые, как ожидается, вернут со временем полезный результат, значение которого вам пригодится позже. Для выполнения вычислений можно запустить новый поток, но это будет означать, что следует позаботиться о передаче результата в основную программу, поскольку std::thread
не предоставляет непосредственного механизма для выполнения этой задачи.
Стандартная библиотека предоставляет средства для получения возвращаемых значений и перехвата исключений, создаваемых асинхронными задачами (т. е. функциями, запущенными в отдельных потоках). Эти значения передаются через общие объекты состояния выполнения, в которые асинхронная задача может записать свое возвращаемое значение или сохранить исключение, и которые могут быть проверены другими потоками, содержащими экземпляры std::future
или std::shared_future
, ссылающиеся на это общее состояние.
Кроме непосредственно механизма возврата значений с помощью std::future
и std::promise
, стандартная библиотека предоставляет более высокоуровневые средства для запуска задач, которые должны вернуть значение. std::packaged_task
является обёрткой для ваших функций, которая позволяет автоматизировать сохранение результата в std::promise
. А std::async
является наиболее высокоуровневым инструментом для автоматического запуска задачи в отдельном потоке с возможностью позже запросить результат выполнения. Начнём рассмотрение с базовых низкоуровневых инструментов, чтобы понимать механику работы.
Низкоуровневые средства: возврат значений и проброс исключений с помощью std::future
и std::promise
std::promise
– это базовый механизм, позволяющий передавать значение между потоками. Каждый объект std::promise
связан с объектом std::future
. Это пара классов, один из которых (std::promise
) отвечает за установку значения, а другой (std::future
) – за его получение. Первый поток может ожидать установки значения с помощью вызова метода std::future::wait
или std::future::get
, в то время, как второй поток установит это значение с помощью вызова метода std::promise::set_value
, или передаст первому исключение вызовом метода std::promise::set_exception
.
Возврат значения
Шаблон класса std::promise
предоставляет средство для сохранения значения или исключения, которое позже асинхронно забирается через объект std::future
, созданный объектом std::promise
.
Шаблон класса std::future
предоставляет механизм доступа к результату асинхронных операций.
Пара объектов (std::promise
и связанный с ним std::future
) образуют канал связи между потоками. std::promise
предоставляет операцию push для этого канала связи. Значение, записанное с помощью promise
, может быть прочитано с помощью объекта future
.
Каждый объект promise
связан с общим состоянием выполнения, которое может быть еще не установлено или может хранить значение или исключение. Когда асинхронная операция готова вернуть результат, она может сделать это, изменив общее состояние (например, с помощью метода std::promise::set_value
). Объект std::future
(его можно получить с помощью метода std::promise::get_future
) связывается с этим же самым общим состоянием. Поток, запустивший асинхронную операцию может затем использовать различные методы для проверки, ожидания готовности или извлечения значения из std::future
. Эти методы могут блокировать выполнение, если асинхронная операция еще не предоставила значение.
Сохранение результата или исключения в std::promise
приводит операцию в состояние готовности. Эта операция разблокирует поток, ожидающий результата. Если объект promise
был уничтожен, а результат (значение или исключение) не был сохранён, то сохраняется исключение типа std::future_error
с кодом ошибки std::future_errc::broken_promise
, происходит приведение в состояние готовности.
Обратите внимание:
- объект
std::promise
предназначен для использования только один раз, запросить значение (get()
) изstd::future
можно только один раз; - с помощью
std::future
результата может дожидаться только один поток.. Параллельный доступ к одному и тому же общему состоянию может приводить к конфликтам.
Итак, как этим пользоваться?
Шаблон std::promise<T>
позволяет устанавливать значение (типа T
), которое позже можно прочитать через связанный объект std::future<T>
. Ожидающий поток может заблокироваться на фьючерсе, а поток, предоставляющий данные,– воспользоваться другой половиной пары, промисом (promise
, иногда называют обещанием), для установки связанного значения и приведения фьючерса в состояние готовности. Получить объект фьючерса std::future
, связанный с заданным объектом std::promise
, можно вызовом метода get_future()
. Когда значение в promise
установлено (с помощью метода set_value()
), фьючерс приводится в состояние готовности и может использоваться для извлечения сохраненного значения. Если объект std::promise
уничтожить без установки значения, вместо него будет сохранено исключение.
В нашем распоряжении есть несколько методов:
std::promise
:get_future()
позволяет получить объектstd::future
, связанный с нашим объектомstd::promise
;set_value(value)
сохраняет значение, которое можно запросить с помощью связанного объектаstd::future
;set_exception(exception)
сохраняет исключение, которое будет брошено в потоке, запросившем значение из объектаstd::future
;set_value_at_thread_exit()
иset_exception_at_thread_exit()
сохраняют значение или исключение после завершения потока аналогично тому, как работаетstd::notify_all_at_thread_exit
;
std::future
:get()
дожидается, когда promise сохранит результат, и возвращает его. После вызова этого метода объектfuture
удаляет ссылку на общее состояние, и методvalid()
начинает возвращатьfalse
. Вызов для невалидного (valid()
возвращаетfalse
) объекта приводит к неопределённому поведению или исключению (зависит от реализации). Если вpromise
было записано исключение, то оно будет брошено при вызове;valid()
проверяет, связан ли объектfuture
с каким-то общим состоянием. Вызов других методов для невалидного объекта приводит к неопределённому поведению или исключению (зависит от реализации);wait()
блокирует текущий поток, покаpromise
не запишет значение. Вызов для невалидного (valid()
возвращаетfalse
) объекта приводит к неопределённому поведению или исключению (зависит от реализации);wait_for()
иwait_until()
работают аналогично методуwait
, но с ограничением на время ожидания. Возвращаютfuture_status
;share()
конструирует и возвращаетshared_future
. Несколько объектовstd::shared_future
могут ссылаться на одно и то же общее состояние, что невозможно дляstd::future
. После вызова метода объектfuture
удаляет ссылку на общее состояние, и методvalid()
начинает возвращатьfalse
.
Пример:
#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <iostream>
#include <chrono>
void accumulate(std::vector<int>::iterator first,
std::vector<int>::iterator last,
std::promise<int> accumulate_promise)
{
int sum = std::accumulate(first, last, 0);
accumulate_promise.set_value(sum); // оповестить future
}
int main()
{
// Демонстрация использования promise<int> для передачи результата между потоками.
std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
std::promise<int> accumulate_promise;
std::future<int> accumulate_future = accumulate_promise.get_future();
std::thread work_thread(accumulate, numbers.begin(), numbers.end(),
std::move(accumulate_promise));
// future::get() будет ждать, пока future не будет иметь допустимый результат,
// после чего запросит его.
// Вызов wait() перед get() не требуется.
//accumulate_future.wait(); // ждать результата
std::cout << "result=" << accumulate_future.get() << '\n';
work_thread.join(); // ждем завершения потока
}
Проброс исключения
Предположим, что вызываемая в отдельном потоке функция может выдавать исключение:
double square_root(double x){
if(x<0) {
throw std::out_of_range("x<0");
}
return sqrt(x);
}
Если в функцию square_root()
передается значение -1, она выдает исключение, которое становится видимым вызывающему коду. В идеале при выполнении этой функции в отдельном потоке хотелось бы получить точно такое же поведение, как при однопоточном варианте выполнения: было бы неплохо, чтобы код, вызвавший future::get()
, мог видеть исключение.
std::promise
предоставляет возможности сохранить исключение. Если вместо значения требуется сохранить исключение, то вместо set_value()
вызывается метод set_exception()
. Исключение сохраняется во фьючерсе на месте сохраненного значения, фьючерс приводится в состояние готовности и вызов get()
бросает сохраненное исключение. (Примечание: в стандарте не указано, является ли повторно выдаваемое исключение исходным объектом исключения или его копией, разные компиляторы и библиотеки делают выбор по своему усмотрению.)
Обычно для исключения, выдаваемого в качестве части алгоритма, это делается в блоке catch
:
extern std::promise<double> some_promise;
try{
some_promise.set_value(square_root(x));
}
catch(...){
some_promise.set_exception(std::current_exception());
}
some_promise.set_exception(std::make_exception_ptr(std::logic_error("foo ")));
Такой код выглядит намного понятнее, чем код с применением блока try
-catch
, – это не только упрощает код, но и расширяет возможности компилятора в области оптимизации кода.
То же самое происходит, если функция заключена в std::packaged_task
: когда при вызове задачи этой функцией выдается исключение, оно сохраняется во фьючерсе на месте результата, готового к выдаче при вызове функции get()
.
Аналогичное поведение может быть достигнуто с помощью std::async
:
std::future<double> f = std::async(square_root, -1);
double y = f.get();
Еще один способ сохранения исключения во фьючерсе заключается в уничтожении связанного с фьючерсом объекта std::promise
или объекта std::packaged_task
без вызова каких-либо set-функций в отношении promise
или без обращения к упакованной задаче. В этом случае деструктор std::promise
или std::packaged_task
сохранит исключение std::future_error
с кодом ошибки std::future_errc::broken_promise
в связанном состоянии, если фьючерс еще не перешел в состояние готовности: созданием фьючерса дается обещание предоставить значение или исключение, а уничтожением источника этого значения или исключения без их предоставления это обещание нарушается. Если бы компилятор в таком случае ничего не сохранял во фьючерсе, ожидающие потоки могли бы ожидать бесконечно.
Передача событий без состояния
promise
-future
можно использовать не только для передачи значения, но и просто для уведомления (хотя для этого можно использовать condition variables), если сохранить тип void
. Например, можно сделать барьер (в С++20 для этого есть специальные средства).
Пример:
#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <iostream>
#include <chrono>
void do_work(std::promise<void> barrier)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
barrier.set_value();
}
int main()
{
// Демонстрация использования promise<void>
// для сигнализации состояния между потоками.
std::promise<void> barrier;
std::future<void> barrier_future = barrier.get_future();
std::thread new_work_thread(do_work, std::move(barrier));
barrier_future.wait();
new_work_thread.join();
}
Среднеуровневые средства: обёртка для функций и callable объектов std::packaged_task
Использование promise
– это не единственный способ возврата значения из функции, выполняемой в другом потоке. Сделать это можно также заключением задачи в экземпляр std::packaged_task<>
. Шаблон класса std::packaged_task
является абстракцией более высокого уровня, чем std::promise
.
Шаблон класса std::packaged_task
обёртывает любую вызываемую цель (функцию, лямбда-выражение, bind expression или другой callable объект), чтобы ее можно было вызвать асинхронно с получением возвращаемого значения или исключения. Возвращаемое значение или вызванное исключение хранится в общем состоянии, доступ к которому можно получить через объекты std::future
.
std::packaged_task
работает так же, как если бы мы создали объект std::promise
и сохранили в него результат работы функции.
Шаблон класса std::packaged_task<>
привязывает фьючерс к функции или вызываемому объекту. Когда вызывается объект std::packaged_task<>
, он вызывает связанную функцию или объект и приводит фьючерс в состояние готовности после возврата функцией значения или броска исключения. Этим классом можно воспользоваться как строительным блоком для пула потоков или других схем управления задачами, например, для запуска всех задач в специально выделенном потоке, работающем в фоновом режиме. Таким образом удается абстрагироваться от подробностей задач – диспетчер имеет дело только с экземплярами std::packaged_task
, а не с отдельно взятыми функциями.
Параметром шаблона для std::packaged_task<>
является сигнатура функции, например void()
для функции, не получающей параметры и не имеющей возвращаемых значений, или int(std::string&,double*)
для функции, получающей не-const-ссылку на std::string
и указатель на double
и возвращающей значение типа int
. При создании экземпляра std::packaged_task
ему следует передать функцию или вызываемый объект, принимающий указанные параметры, а затем возвращающий тип, который можно преобразовать в указанный тип возвращаемого значения. Точного совпадения типов не требуется, можно сконструировать объект std::packaged_task<double(double)>
из функции, принимающей значение типа int
и возвращающей значение типа float
, поскольку возможно неявное приведение типов. Тип возвращаемого значения, указанный в сигнатуре функции, определяет тип объекта std::future<>
, возвращаемого методом get_future()
, а заданный в сигнатуре список аргументов используется для определения сигнатуры оператора вызова в классе packaged_task
.
Объект std::packaged_task
является вызываемым, значит, его можно обернуть объектом std::function
или передать конструктору std::thread
в качестве функции потока, или даже вызвать напрямую.
Когда std::packaged_task
вызывается, аргументы, предоставленные оператору вызова функции, передаются содержащейся в этом объекте функции, а возвращаемое значение сохраняется в качестве результата в объекте std::future
, полученном от get_future()
.Таким образом, задачу можно заключить в объект std::packaged_task
и извлечь фьючерс перед передачей объекта std::packaged_task
в отдельный поток. Когда понадобится результат, можно будет дождаться готовности фьючерса.
Итак, как это использовать?
В нашем распоряжении несколько методов:
get_future()
позволяет получить связанный с состоянием задачи объектstd::future
, с помощью которого можно получить возвращаемое значение функции или брошенное исключение;operator()
позволяет вызвать обёрнутую функцию, нужно передать аргументы функции;make_ready_at_thread_exit()
позволяет дождаться полного завершения потока перед тем, как привестиfuture
в состояние готовности;reset()
очищает результаты предыдущего запуска задачи.
Пример:
#include <iostream>
#include <cmath>
#include <thread>
#include <future>
#include <functional>
// уникальная функция, позволяющая избежать неоднозначности
// набора перегрузки std::pow
int f(int x, int y) { return std::pow(x,y); }
void task_lambda()
{
std::packaged_task<int(int,int)> task([](int a, int b) {
return std::pow(a, b);
});
std::future<int> result = task.get_future();
task(2, 9);
std::cout << "task_lambda:\t" << result.get() << '\n';
}
void task_bind()
{
std::packaged_task<int()> task(std::bind(f, 2, 11));
std::future<int> result = task.get_future();
task();
std::cout << "task_bind:\t" << result.get() << '\n';
}
void task_thread()
{
std::packaged_task<int(int,int)> task(f);
std::future<int> result = task.get_future();
std::thread task_td(std::move(task), 2, 10);
task_td.join();
std::cout << "task_thread:\t" << result.get() << '\n';
}
int main()
{
task_lambda();
task_bind();
task_thread();
}
Пример с ожиданием полного завершения потока:
#include <future>
#include <iostream>
#include <chrono>
#include <thread>
#include <functional>
#include <utility>
void worker(std::future<void>& output)
{
std::packaged_task<void(bool&)> my_task{ [](bool& done) { done=true; } };
auto result = my_task.get_future();
bool done = false;
my_task.make_ready_at_thread_exit(done); // выполнить задачу сразу
std::cout << "worker: done = " << std::boolalpha << done << std::endl;
auto status = result.wait_for(std::chrono::seconds(0));
if (status == std::future_status::timeout)
std::cout << "worker: result is not ready yet" << std::endl;
output = std::move(result);
}
int main()
{
std::future<void> result;
std::thread{worker, std::ref(result)}.join();
auto status = result.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready)
std::cout << "main: result is ready" << std::endl;
}
Пример со сбросом результатов предыдущего выполнения:
#include <iostream>
#include <cmath>
#include <thread>
#include <future>
int main()
{
std::packaged_task<int(int,int)> task([](int a, int b) {
return std::pow(a, b);
});
std::future<int> result = task.get_future();
task(2, 9);
std::cout << "2^9 = " << result.get() << '\n';
task.reset();
result = task.get_future();
std::thread task_td(std::move(task), 2, 10);
task_td.join();
std::cout << "2^10 = " << result.get() << '\n';
}
Высокоуровневые средства: запуск задач асинхронно с помощью std::async
Всё, что было описано выше – это хорошо, но может казаться слишком сложным для того, чтобы просто запустить задачу в отдельном потоке и получить значение. Иногда хочется иметь ещё более высокоуровневые инструменты и запускать задачи в одну строчку кода. Стандартная библиотека C++ предоставляет такую возможность.
std::async
запускает функцию f
асинхронно (потенциально в отдельном потоке, который может быть частью пула потоков) и возвращает std::future
, который в конечном итоге будет содержать результат вызова этой функции.
std::async
позволяет установить политику запуска задачи:
std::launch::async
выполняет вызываемый объектf
в новом потоке выполнения, как если бы он был запущен с помощьюstd::thread(std::forward<F>(f), std::forward<Args>(args)...)
, за исключением того, что если функцияf
возвращает значение или создает исключение, то оно хранится в общем состоянии, доступном черезstd::future
, котороеasync
возвращает вызывающей стороне.std::launch::deferred
не порождает новый поток выполнения. Вместо этого функция выполняется лениво: первый вызов несинхронной функции ожидания вstd::future
, возвращенном вызывающему объекту, вызовет копиюf
(как rvalue) с копиямиargs...
(также передается как rvalues) в текущем потоке (который не обязательно должен быть потоком, который изначально вызывалstd::async
). Результат или исключение помещается в общее состояние, объектfuture
приводится в состояние готовности. Дальнейший запрос результата из того жеstd::future
немедленно вернёт результат.std::launch::async | std::launch::deferred
в зависимости от реализации, производится или асинхронное выполнение, или ленивое.- Если ни
std::launch::async
, ниstd::launch::deferred
не установлен, то задаётся политика по умолчаниюstd::launch::async | std::launch::deferred
.
std::async
возвращает объект std::future
для получения значения.
std::async
бросает исключение std::system_error
, если политика запуска равна std::launch::async
, но реализация не может запустить новый поток, или std::bad_alloc
, если память для внутренних структур данных не может быть выделена.
Если std::future
, полученный из std::async
, не сохраняется, деструктор std::future
блокирует поток до завершения асинхронной операции, как при синхронном выполнении:
std::async(std::launch::async, []{ f(); }); // деструктор временной переменной ждет выполнения f()
std::async(std::launch::async, []{ g(); }); // не запускается, пока не завершится f()
Обратите внимание, что деструкторы объектов std::future
, полученных не из std::async
, не блокируют поток.
Пример:
#include <iostream>
#include <vector>
#include <algorithm>
#include <numeric>
#include <future>
#include <string>
#include <mutex>
std::mutex m;
struct X {
void foo(int i, const std::string& str) {
std::lock_guard<std::mutex> lk(m);
std::cout << str << ' ' << i << '\n';
}
void bar(const std::string& str) {
std::lock_guard<std::mutex> lk(m);
std::cout << str << '\n';
}
int operator()(int i) {
std::lock_guard<std::mutex> lk(m);
std::cout << i << '\n';
return i + 10;
}
};
template <typename RandomIt>
int parallel_sum(RandomIt beg, RandomIt end)
{
auto len = end - beg;
if (len < 1000)
return std::accumulate(beg, end, 0);
RandomIt mid = beg + len/2;
auto handle = std::async(std::launch::async,
parallel_sum<RandomIt>, mid, end);
int sum = parallel_sum(beg, mid);
return sum + handle.get();
}
int main()
{
std::vector<int> v(10000, 1);
std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';
X x;
// Вызывает (&x)->foo(42, "Hello") с политикой по умолчанию:
// может напечатать "Hello 42" сразу или отложить выполнение
auto a1 = std::async(&X::foo, &x, 42, "Hello");
// Вызывает x.bar("world!") с политикой отложенного выполнения:
// печатает "world!", когда вызывается a2.get() или a2.wait()
auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!");
// Вызывает X()(43); политикой асинхронного выполнения:
// печатает "43" сразу
auto a3 = std::async(std::launch::async, X(), 43);
a2.wait(); // печатает "world!"
std::cout << a3.get() << '\n'; // печатает "53"
} // если a1 не выполнено в этом месте, деструктор a1 напечатает "Hello 42" здесь
/*
Возможный вывод:
The sum is 10000
43
world!
53
Hello 42
*/
Ожидание результата в нескольких потоках с помощью std::shared_future
До сих пор во всех примерах использовался объект std::future
. Но у него есть ограничения, в частности, результата может дожидаться только один поток. Если наступления одного и того же события нужно дожидаться сразу из нескольких потоков, следует воспользоваться std::shared_future
.
Хотя std::future
вполне справляется со всей синхронизацией, необходимой для переноса данных из одного потока в другой, вызовы методов std::future
не синхронизированы друг с другом. Если обращаться к одному и тому же объекту std::future
из нескольких потоков без дополнительной синхронизации, возникнет состояние гонки за данными и неопределенное поведение. std::future
моделирует исключительное владение результатом асинхронных вычислений, а одноразовая природа функции get()
лишает конкурентный доступ всякого смысла – значение можно извлечь только одним потоком, поскольку после первого же вызова get()
значения для извлечения уже не останется.
Если же ваш проект требует, чтобы ожидать результата выполнения функции могли сразу несколько потоков, нужно использовать std::shared_future
. Если std::future
допускает только перемещение (чтобы право владения передавалось между экземплярами, но чтобы в конкретный момент только один экземпляр ссылался на конкретный результат асинхронного вычисления), экземпляры std::shared_future
допускают копирование, поэтому могут существовать сразу несколько объектов, ссылающихся на одно и то же связанное состояние.
Однако работа с одним и тем же объектом std::shared_future
из разных потоков по прежнему не синхронизирована, и во избежание проблем, следует передать каждому заинтересованному потоку собственную копию объекта std::shared_future
, тогда все внутренние операции будут корректно синхронизированы средствами библиотеки. Таким образом, безопасность доступа к асинхронному состоянию из нескольких потоков обеспечивается, если каждый поток обращается к этому состоянию посредством собственного объекта std::shared_future
.
Сконструировать объект std::shared_future
можно либо передав право собственности его конструктору из std::future
с помощью std::move
:
std::shared_future sf(std::move(future));
Для r-value вызов std::move
не требуется:
std::promise<int> p;
std::shared_future<int> sf(p.get_future());
std::promise<int> p;
auto sf = p.get_future().share();
Пример использования std::shared_future
для реализации барьера:
#include <iostream>
#include <future>
#include <chrono>
int main()
{
std::promise<void> ready_promise, t1_ready_promise, t2_ready_promise;
std::shared_future<void> ready_future(ready_promise.get_future());
std::chrono::time_point<std::chrono::high_resolution_clock> start;
auto fun1 = [&, ready_future]() -> std::chrono::duration<double, std::milli>
{
t1_ready_promise.set_value();
ready_future.wait(); // ждет сигнала из main()
return std::chrono::high_resolution_clock::now() - start;
};
auto fun2 = [&, ready_future]() -> std::chrono::duration<double, std::milli>
{
t2_ready_promise.set_value();
ready_future.wait(); // ждет сигнала из main()
return std::chrono::high_resolution_clock::now() - start;
};
auto fut1 = t1_ready_promise.get_future();
auto fut2 = t2_ready_promise.get_future();
auto result1 = std::async(std::launch::async, fun1);
auto result2 = std::async(std::launch::async, fun2);
// ждем, когда потоки станут готовы
fut1.wait();
fut2.wait();
// потоки готовы, начинаем отсчет
start = std::chrono::high_resolution_clock::now();
// сигнализируем потокам, чтобы они продолжили выполнение
ready_promise.set_value();
std::cout << "Thread 1 received the signal "
<< result1.get().count() << " ms after start\n"
<< "Thread 2 received the signal "
<< result2.get().count() << " ms after start\n";
}
/*
Возможный вывод:
Thread 1 received the signal 0.072 ms after start
Thread 2 received the signal 0.041 ms after start
*/