Многопоточность в C++. Возврат значений и проброс исключений

Добавлено 1 января 2021 в 17:34

Предположим, что имеются какие-то длительные вычисления, которые, как ожидается, вернут со временем полезный результат, значение которого вам пригодится позже. Для выполнения вычислений можно запустить новый поток, но это будет означать, что следует позаботиться о передаче результата в основную программу, поскольку std::thread не предоставляет непосредственного механизма для выполнения этой задачи.

Стандартная библиотека предоставляет средства для получения возвращаемых значений и перехвата исключений, создаваемых асинхронными задачами (т. е. функциями, запущенными в отдельных потоках). Эти значения передаются через общие объекты состояния выполнения, в которые асинхронная задача может записать свое возвращаемое значение или сохранить исключение, и которые могут быть проверены другими потоками, содержащими экземпляры std::future или std::shared_future, ссылающиеся на это общее состояние.

Кроме непосредственно механизма возврата значений с помощью std::future и std::promise, стандартная библиотека предоставляет более высокоуровневые средства для запуска задач, которые должны вернуть значение. std::packaged_task является обёрткой для ваших функций, которая позволяет автоматизировать сохранение результата в std::promise. А std::async является наиболее высокоуровневым инструментом для автоматического запуска задачи в отдельном потоке с возможностью позже запросить результат выполнения. Начнём рассмотрение с базовых низкоуровневых инструментов, чтобы понимать механику работы.

Низкоуровневые средства: возврат значений и проброс исключений с помощью std::future и std::promise

std::promise – это базовый механизм, позволяющий передавать значение между потоками. Каждый объект std::promise связан с объектом std::future. Это пара классов, один из которых (std::promise) отвечает за установку значения, а другой (std::future) – за его получение. Первый поток может ожидать установки значения с помощью вызова метода std::future::wait или std::future::get, в то время, как второй поток установит это значение с помощью вызова метода std::promise::set_value, или передаст первому исключение вызовом метода std::promise::set_exception.

Возврат значения

Шаблон класса std::promise предоставляет средство для сохранения значения или исключения, которое позже асинхронно забирается через объект std::future, созданный объектом std::promise.

Шаблон класса std::future предоставляет механизм доступа к результату асинхронных операций.

Пара объектов (std::promise и связанный с ним std::future) образуют канал связи между потоками. std::promise предоставляет операцию push для этого канала связи. Значение, записанное с помощью promise, может быть прочитано с помощью объекта future.

Каждый объект promise связан с общим состоянием выполнения, которое может быть еще не установлено или может хранить значение или исключение. Когда асинхронная операция готова вернуть результат, она может сделать это, изменив общее состояние (например, с помощью метода std::promise::set_value). Объект std::future (его можно получить с помощью метода std::promise::get_future) связывается с этим же самым общим состоянием. Поток, запустивший асинхронную операцию может затем использовать различные методы для проверки, ожидания готовности или извлечения значения из std::future. Эти методы могут блокировать выполнение, если асинхронная операция еще не предоставила значение.

Сохранение результата или исключения в std::promise приводит операцию в состояние готовности. Эта операция разблокирует поток, ожидающий результата. Если объект promise был уничтожен, а результат (значение или исключение) не был сохранён, то сохраняется исключение типа std::future_error с кодом ошибки std::future_errc::broken_promise, происходит приведение в состояние готовности.

Обратите внимание:

  • объект std::promise предназначен для использования только один раз, запросить значение (get()) из std::future можно только один раз;
  • с помощью std::future результата может дожидаться только один поток.. Параллельный доступ к одному и тому же общему состоянию может приводить к конфликтам.

Итак, как этим пользоваться?

Шаблон std::promise<T> позволяет устанавливать значение (типа T), которое позже можно прочитать через связанный объект std::future<T>. Ожидающий поток может заблокироваться на фьючерсе, а поток, предоставляющий данные,– воспользоваться другой половиной пары, промисом (promise, иногда называют обещанием), для установки связанного значения и приведения фьючерса в состояние готовности. Получить объект фьючерса std::future, связанный с заданным объектом std::promise, можно вызовом метода get_future(). Когда значение в promise установлено (с помощью метода set_value()), фьючерс приводится в состояние готовности и может использоваться для извлечения сохраненного значения. Если объект std::promise уничтожить без установки значения, вместо него будет сохранено исключение.

В нашем распоряжении есть несколько методов:

  • std::promise:
    • get_future() позволяет получить объект std::future, связанный с нашим объектом std::promise;
    • set_value(value) сохраняет значение, которое можно запросить с помощью связанного объекта std::future;
    • set_exception(exception) сохраняет исключение, которое будет брошено в потоке, запросившем значение из объекта std::future;
    • set_value_at_thread_exit() и set_exception_at_thread_exit() сохраняют значение или исключение после завершения потока аналогично тому, как работает std::notify_all_at_thread_exit;
  • std::future:
    • get() дожидается, когда promise сохранит результат, и возвращает его. После вызова этого метода объект future удаляет ссылку на общее состояние, и метод valid() начинает возвращать false. Вызов для невалидного (valid() возвращает false) объекта приводит к неопределённому поведению или исключению (зависит от реализации). Если в promise было записано исключение, то оно будет брошено при вызове;
    • valid() проверяет, связан ли объект future с каким-то общим состоянием. Вызов других методов для невалидного объекта приводит к неопределённому поведению или исключению (зависит от реализации);
    • wait() блокирует текущий поток, пока promise не запишет значение. Вызов для невалидного (valid() возвращает false) объекта приводит к неопределённому поведению или исключению (зависит от реализации);
    • wait_for() и wait_until() работают аналогично методу wait, но с ограничением на время ожидания. Возвращают future_status;
    • share() конструирует и возвращает shared_future. Несколько объектов std::shared_future могут ссылаться на одно и то же общее состояние, что невозможно для std::future. После вызова метода объект future удаляет ссылку на общее состояние, и метод valid() начинает возвращать false.

Пример:

#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <iostream>
#include <chrono>
 
void accumulate(std::vector<int>::iterator first,
                std::vector<int>::iterator last,
                std::promise<int> accumulate_promise)
{
    int sum = std::accumulate(first, last, 0);
    accumulate_promise.set_value(sum);  // оповестить future
}
 
int main()
{
    // Демонстрация использования promise<int> для передачи результата между потоками.
    std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
    std::promise<int> accumulate_promise;
    std::future<int> accumulate_future = accumulate_promise.get_future();
    std::thread work_thread(accumulate, numbers.begin(), numbers.end(),
                            std::move(accumulate_promise));
 
    // future::get() будет ждать, пока future не будет иметь допустимый результат,
    // после чего запросит его.
    // Вызов wait() перед get() не требуется.
    //accumulate_future.wait();  // ждать результата
    std::cout << "result=" << accumulate_future.get() << '\n';
    work_thread.join();  // ждем завершения потока
}

Проброс исключения

Предположим, что вызываемая в отдельном потоке функция может выдавать исключение:

double square_root(double x){
    if(x<0) {
        throw std::out_of_range("x<0");
    }
    return sqrt(x);
}

Если в функцию square_root() передается значение -1, она выдает исключение, которое становится видимым вызывающему коду. В идеале при выполнении этой функции в отдельном потоке хотелось бы получить точно такое же поведение, как при однопоточном варианте выполнения: было бы неплохо, чтобы код, вызвавший future::get(), мог видеть исключение.

std::promise предоставляет возможности сохранить исключение. Если вместо значения требуется сохранить исключение, то вместо set_value() вызывается метод set_exception(). Исключение сохраняется во фьючерсе на месте сохраненного значения, фьючерс приводится в состояние готовности и вызов get() бросает сохраненное исключение. (Примечание: в стандарте не указано, является ли повторно выдаваемое исключение исходным объектом исключения или его копией, разные компиляторы и библиотеки делают выбор по своему усмотрению.)

Обычно для исключения, выдаваемого в качестве части алгоритма, это делается в блоке catch:

extern std::promise<double> some_promise;
try{
    some_promise.set_value(square_root(x));
}
catch(...){
    some_promise.set_exception(std::current_exception());
}
some_promise.set_exception(std::make_exception_ptr(std::logic_error("foo ")));

Такой код выглядит намного понятнее, чем код с применением блока try-catch, – это не только упрощает код, но и расширяет возможности компилятора в области оптимизации кода.

То же самое происходит, если функция заключена в std::packaged_task: когда при вызове задачи этой функцией выдается исключение, оно сохраняется во фьючерсе на месте результата, готового к выдаче при вызове функции get().

Аналогичное поведение может быть достигнуто с помощью std::async:

std::future<double> f = std::async(square_root, -1);
double y = f.get();

Еще один способ сохранения исключения во фьючерсе заключается в уничтожении связанного с фьючерсом объекта std::promise или объекта std::packaged_task без вызова каких-либо set-функций в отношении promise или без обращения к упакованной задаче. В этом случае деструктор std::promise или std::packaged_task сохранит исключение std::future_error с кодом ошибки std::future_errc::broken_promise в связанном состоянии, если фьючерс еще не перешел в состояние готовности: созданием фьючерса дается обещание предоставить значение или исключение, а уничтожением источника этого значения или исключения без их предоставления это обещание нарушается. Если бы компилятор в таком случае ничего не сохранял во фьючерсе, ожидающие потоки могли бы ожидать бесконечно.

Передача событий без состояния

promise-future можно использовать не только для передачи значения, но и просто для уведомления (хотя для этого можно использовать condition variables), если сохранить тип void. Например, можно сделать барьер (в С++20 для этого есть специальные средства).

Пример:

#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <iostream>
#include <chrono>

void do_work(std::promise<void> barrier)
{
    std::this_thread::sleep_for(std::chrono::seconds(1));
    barrier.set_value();
}
 
int main()
{
    // Демонстрация использования promise<void>
    // для сигнализации состояния между потоками.
    std::promise<void> barrier;
    std::future<void> barrier_future = barrier.get_future();
    std::thread new_work_thread(do_work, std::move(barrier));
    barrier_future.wait();
    new_work_thread.join();
}

Среднеуровневые средства: обёртка для функций и callable объектов std::packaged_task

Использование promise – это не единственный способ возврата значения из функции, выполняемой в другом потоке. Сделать это можно также заключением задачи в экземпляр std::packaged_task<>. Шаблон класса std::packaged_task является абстракцией более высокого уровня, чем std::promise.

Шаблон класса std::packaged_task обёртывает любую вызываемую цель (функцию, лямбда-выражение, bind expression или другой callable объект), чтобы ее можно было вызвать асинхронно с получением возвращаемого значения или исключения. Возвращаемое значение или вызванное исключение хранится в общем состоянии, доступ к которому можно получить через объекты std::future.

std::packaged_task работает так же, как если бы мы создали объект std::promise и сохранили в него результат работы функции.

Шаблон класса std::packaged_task<> привязывает фьючерс к функции или вызываемому объекту. Когда вызывается объект std::packaged_task<>, он вызывает связанную функцию или объект и приводит фьючерс в состояние готовности после возврата функцией значения или броска исключения. Этим классом можно воспользоваться как строительным блоком для пула потоков или других схем управления задачами, например, для запуска всех задач в специально выделенном потоке, работающем в фоновом режиме. Таким образом удается абстрагироваться от подробностей задач – диспетчер имеет дело только с экземплярами std::packaged_task, а не с отдельно взятыми функциями.

Параметром шаблона для std::packaged_task<> является сигнатура функции, например void() для функции, не получающей параметры и не имеющей возвращаемых значений, или int(std::string&,double*) для функции, получающей не-const-ссылку на std::string и указатель на double и возвращающей значение типа int. При создании экземпляра std::packaged_task ему следует передать функцию или вызываемый объект, принимающий указанные параметры, а затем возвращающий тип, который можно преобразовать в указанный тип возвращаемого значения. Точного совпадения типов не требуется, можно сконструировать объект std::packaged_task<double(double)> из функции, принимающей значение типа int и возвращающей значение типа float, поскольку возможно неявное приведение типов. Тип возвращаемого значения, указанный в сигнатуре функции, определяет тип объекта std::future<>, возвращаемого методом get_future(), а заданный в сигнатуре список аргументов используется для определения сигнатуры оператора вызова в классе packaged_task.

Объект std::packaged_task является вызываемым, значит, его можно обернуть объектом std::function или передать конструктору std::thread в качестве функции потока, или даже вызвать напрямую.

Когда std::packaged_task вызывается, аргументы, предоставленные оператору вызова функции, передаются содержащейся в этом объекте функции, а возвращаемое значение сохраняется в качестве результата в объекте std::future, полученном от get_future().Таким образом, задачу можно заключить в объект std::packaged_task и извлечь фьючерс перед передачей объекта std::packaged_task в отдельный поток. Когда понадобится результат, можно будет дождаться готовности фьючерса.

Итак, как это использовать?

В нашем распоряжении несколько методов:

  • get_future() позволяет получить связанный с состоянием задачи объект std::future, с помощью которого можно получить возвращаемое значение функции или брошенное исключение;
  • operator() позволяет вызвать обёрнутую функцию, нужно передать аргументы функции;
  • make_ready_at_thread_exit() позволяет дождаться полного завершения потока перед тем, как привести future в состояние готовности;
  • reset() очищает результаты предыдущего запуска задачи.

Пример:

#include <iostream>
#include <cmath>
#include <thread>
#include <future>
#include <functional>
 
// уникальная функция, позволяющая избежать неоднозначности
// набора перегрузки std::pow
int f(int x, int y) { return std::pow(x,y); }
 
void task_lambda()
{
    std::packaged_task<int(int,int)> task([](int a, int b) {
        return std::pow(a, b); 
    });
    std::future<int> result = task.get_future();
 
    task(2, 9);
 
    std::cout << "task_lambda:\t" << result.get() << '\n';
}
 
void task_bind()
{
    std::packaged_task<int()> task(std::bind(f, 2, 11));
    std::future<int> result = task.get_future();
 
    task();
 
    std::cout << "task_bind:\t" << result.get() << '\n';
}
 
void task_thread()
{
    std::packaged_task<int(int,int)> task(f);
    std::future<int> result = task.get_future();
 
    std::thread task_td(std::move(task), 2, 10);
    task_td.join();
 
    std::cout << "task_thread:\t" << result.get() << '\n';
}
 
int main()
{
    task_lambda();
    task_bind();
    task_thread();
}

Пример с ожиданием полного завершения потока:

#include <future>
#include <iostream>
#include <chrono>
#include <thread>
#include <functional>
#include <utility>
 
void worker(std::future<void>& output)
{
    std::packaged_task<void(bool&)> my_task{ [](bool& done) { done=true; } };
 
    auto result = my_task.get_future();
 
    bool done = false;
 
    my_task.make_ready_at_thread_exit(done); // выполнить задачу сразу
 
    std::cout << "worker: done = " << std::boolalpha << done << std::endl;
 
    auto status = result.wait_for(std::chrono::seconds(0));
    if (status == std::future_status::timeout)
        std::cout << "worker: result is not ready yet" << std::endl;
 
    output = std::move(result);
}
 
 
int main()
{
    std::future<void> result;
 
    std::thread{worker, std::ref(result)}.join();
 
    auto status = result.wait_for(std::chrono::seconds(0));
    if (status == std::future_status::ready)
        std::cout << "main: result is ready" << std::endl;
}

Пример со сбросом результатов предыдущего выполнения:

#include <iostream>
#include <cmath>
#include <thread>
#include <future>
 
int main()
{
    std::packaged_task<int(int,int)> task([](int a, int b) {
        return std::pow(a, b);
    });
    std::future<int> result = task.get_future();
    task(2, 9);
    std::cout << "2^9 = " << result.get() << '\n';
 
    task.reset();
    result = task.get_future();
    std::thread task_td(std::move(task), 2, 10);
    task_td.join();
    std::cout << "2^10 = " << result.get() << '\n';
}

Высокоуровневые средства: запуск задач асинхронно с помощью std::async

Всё, что было описано выше – это хорошо, но может казаться слишком сложным для того, чтобы просто запустить задачу в отдельном потоке и получить значение. Иногда хочется иметь ещё более высокоуровневые инструменты и запускать задачи в одну строчку кода. Стандартная библиотека C++ предоставляет такую возможность.

std::async запускает функцию f асинхронно (потенциально в отдельном потоке, который может быть частью пула потоков) и возвращает std::future, который в конечном итоге будет содержать результат вызова этой функции.

std::async позволяет установить политику запуска задачи:

  • std::launch::async выполняет вызываемый объект f в новом потоке выполнения, как если бы он был запущен с помощью std::thread(std::forward<F>(f), std::forward<Args>(args)...), за исключением того, что если функция f возвращает значение или создает исключение, то оно хранится в общем состоянии, доступном через std::future, которое async возвращает вызывающей стороне.
  • std::launch::deferred не порождает новый поток выполнения. Вместо этого функция выполняется лениво: первый вызов несинхронной функции ожидания в std::future, возвращенном вызывающему объекту, вызовет копию f (как rvalue) с копиями args... (также передается как rvalues) в текущем потоке (который не обязательно должен быть потоком, который изначально вызывал std::async). Результат или исключение помещается в общее состояние, объект future приводится в состояние готовности. Дальнейший запрос результата из того же std::future немедленно вернёт результат.
  • std::launch::async | std::launch::deferred в зависимости от реализации, производится или асинхронное выполнение, или ленивое.
  • Если ни std::launch::async, ни std::launch::deferred не установлен, то задаётся политика по умолчанию std::launch::async | std::launch::deferred.

std::async возвращает объект std::future для получения значения.

std::async бросает исключение std::system_error, если политика запуска равна std::launch::async, но реализация не может запустить новый поток, или std::bad_alloc, если память для внутренних структур данных не может быть выделена.

Если std::future, полученный из std::async, не сохраняется, деструктор std::future блокирует поток до завершения асинхронной операции, как при синхронном выполнении:

std::async(std::launch::async, []{ f(); }); // деструктор временной переменной ждет выполнения f()
std::async(std::launch::async, []{ g(); }); // не запускается, пока не завершится f()

Обратите внимание, что деструкторы объектов std::future, полученных не из std::async, не блокируют поток.

Пример:

#include <iostream>
#include <vector>
#include <algorithm>
#include <numeric>
#include <future>
#include <string>
#include <mutex>
 
std::mutex m;
struct X {
    void foo(int i, const std::string& str) {
        std::lock_guard<std::mutex> lk(m);
        std::cout << str << ' ' << i << '\n';
    }
    void bar(const std::string& str) {
        std::lock_guard<std::mutex> lk(m);
        std::cout << str << '\n';
    }
    int operator()(int i) {
        std::lock_guard<std::mutex> lk(m);
        std::cout << i << '\n';
        return i + 10;
    }
};
 
template <typename RandomIt>
int parallel_sum(RandomIt beg, RandomIt end)
{
    auto len = end - beg;
    if (len < 1000)
        return std::accumulate(beg, end, 0);
 
    RandomIt mid = beg + len/2;
    auto handle = std::async(std::launch::async,
                             parallel_sum<RandomIt>, mid, end);
    int sum = parallel_sum(beg, mid);
    return sum + handle.get();
}
 
int main()
{
    std::vector<int> v(10000, 1);
    std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';
 
    X x;
    // Вызывает (&x)->foo(42, "Hello") с политикой по умолчанию:
    // может напечатать "Hello 42" сразу или отложить выполнение
    auto a1 = std::async(&X::foo, &x, 42, "Hello");
    // Вызывает x.bar("world!") с политикой отложенного выполнения:
    // печатает "world!", когда вызывается a2.get() или a2.wait()
    auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!");
    // Вызывает X()(43); политикой асинхронного выполнения:
    // печатает "43" сразу
    auto a3 = std::async(std::launch::async, X(), 43);
    a2.wait();                     // печатает "world!"
    std::cout << a3.get() << '\n'; // печатает "53"
} // если a1 не выполнено в этом месте, деструктор a1 напечатает "Hello 42" здесь

/*
Возможный вывод:
The sum is 10000
43
world!
53
Hello 42
*/

Ожидание результата в нескольких потоках с помощью std::shared_future

До сих пор во всех примерах использовался объект std::future. Но у него есть ограничения, в частности, результата может дожидаться только один поток. Если наступления одного и того же события нужно дожидаться сразу из нескольких потоков, следует воспользоваться std::shared_future.

Хотя std::future вполне справляется со всей синхронизацией, необходимой для переноса данных из одного потока в другой, вызовы методов std::future не синхронизированы друг с другом. Если обращаться к одному и тому же объекту std::future из нескольких потоков без дополнительной синхронизации, возникнет состояние гонки за данными и неопределенное поведение. std::future моделирует исключительное владение результатом асинхронных вычислений, а одноразовая природа функции get() лишает конкурентный доступ всякого смысла – значение можно извлечь только одним потоком, поскольку после первого же вызова get() значения для извлечения уже не останется.

Если же ваш проект требует, чтобы ожидать результата выполнения функции могли сразу несколько потоков, нужно использовать std::shared_future. Если std::future допускает только перемещение (чтобы право владения передавалось между экземплярами, но чтобы в конкретный момент только один экземпляр ссылался на конкретный результат асинхронного вычисления), экземпляры std::shared_future допускают копирование, поэтому могут существовать сразу несколько объектов, ссылающихся на одно и то же связанное состояние.

Однако работа с одним и тем же объектом std::shared_future из разных потоков по прежнему не синхронизирована, и во избежание проблем, следует передать каждому заинтересованному потоку собственную копию объекта std::shared_future, тогда все внутренние операции будут корректно синхронизированы средствами библиотеки. Таким образом, безопасность доступа к асинхронному состоянию из нескольких потоков обеспечивается, если каждый поток обращается к этому состоянию посредством собственного объекта std::shared_future.

Сконструировать объект std::shared_future можно либо передав право собственности его конструктору из std::future с помощью std::move:

std::shared_future sf(std::move(future));

Для r-value вызов std::move не требуется:

std::promise<int> p;
std::shared_future<int> sf(p.get_future());
std::promise<int> p;
auto sf = p.get_future().share();

Пример использования std::shared_future для реализации барьера:

#include <iostream>
#include <future>
#include <chrono>
 
int main()
{   
    std::promise<void> ready_promise, t1_ready_promise, t2_ready_promise;
    std::shared_future<void> ready_future(ready_promise.get_future());
 
    std::chrono::time_point<std::chrono::high_resolution_clock> start;
 
    auto fun1 = [&, ready_future]() -> std::chrono::duration<double, std::milli> 
    {
        t1_ready_promise.set_value();
        ready_future.wait(); // ждет сигнала из main()
        return std::chrono::high_resolution_clock::now() - start;
    };
 
 
    auto fun2 = [&, ready_future]() -> std::chrono::duration<double, std::milli> 
    {
        t2_ready_promise.set_value();
        ready_future.wait(); // ждет сигнала из main()
        return std::chrono::high_resolution_clock::now() - start;
    };
 
    auto fut1 = t1_ready_promise.get_future();
    auto fut2 = t2_ready_promise.get_future();
 
    auto result1 = std::async(std::launch::async, fun1);
    auto result2 = std::async(std::launch::async, fun2);
 
    // ждем, когда потоки станут готовы
    fut1.wait();
    fut2.wait();
 
    // потоки готовы, начинаем отсчет
    start = std::chrono::high_resolution_clock::now();
 
    // сигнализируем потокам, чтобы они продолжили выполнение
    ready_promise.set_value();
 
    std::cout << "Thread 1 received the signal "
              << result1.get().count() << " ms after start\n"
              << "Thread 2 received the signal "
              << result2.get().count() << " ms after start\n";
}

/*
Возможный вывод:
Thread 1 received the signal 0.072 ms after start
Thread 2 received the signal 0.041 ms after start
*/

Теги

C++ / Cppstd::asyncstd::futurestd::packaged_taskstd::promisestd::shared_futureSTL / Standard Template Library / Стандартная библиотека шаблоновМногопоточность

На сайте работает сервис комментирования DISQUS, который позволяет вам оставлять комментарии на множестве сайтов, имея лишь один аккаунт на Disqus.com.

В случае комментирования в качестве гостя (без регистрации на disqus.com) для публикации комментария требуется время на премодерацию.