Многопоточность в C++. Простая защита данных с помощью мьютекса

Добавлено 22 декабря 2021 в 00:27

Одним из ключевых преимуществ (перед использованием нескольких процессов) применения потоков для конкурентности является возможность совместного использования (разделения) данных несколькими потоками.

Представьте на минуту, что вы живете в одной квартире с приятелем. У вас одна кухня и одна ванная на двоих. Обычно ванной не пользуются одновременно несколько человек, и то, что сосед слишком долго плещется в воде, вынуждая вас дожидаться своей очереди, не может не раздражать. Возможно, одному из вас захочется запечь в духовке колбаски, в то время как у другого там готовятся кексы, и из этого тоже не выйдет ничего хорошего. Ну и всем знакомо чувство досады, когда при совместно используемом оборудовании вы на полпути к решению какой-нибудь задачи вдруг обнаруживаете, что кто-то взял что-то нужное вам в данный момент или что-то изменил, а вы рассчитывали, что всё останется в прежнем состоянии или на своих местах.

То же самое происходит и с потоками. Если они совместно используют данные, для них нужны правила, определяющие, какой поток и к каким данным может получить доступ, когда и как любые обновления данных будут передаваться другим потокам, интересующимся этими данными. Некорректная работа с общими данными – одна из основных причин ошибок, связанных с конкурентностью.

Когда дело доходит до совместной работы с данными нескольких потоков, то все проблемы возникают из-за последствий изменения этих данных. Если все совместно используемые данные доступны только для чтения, проблем не будет, поскольку данные, считываемые одним потоком, не зависят от того, читает другой поток те же данные или нет. Но если один или несколько потоков, совместно использующих данные, начинают вносить в них изменения, создаются серьезные предпосылки для возникновения проблем. В таком случае следует обеспечить приемлемость конечных результатов.

Предположим, вы покупаете билет в кино. Если кинотеатр большой, билеты будут продавать сразу несколько кассиров, обслуживая одновременно несколько человек. Если кто-то в это же время покупает билет на тот же сеанс в другой кассе, то выбор места зависит от того, кто первым его закажет, вы или другой. Если осталось всего несколько мест, очередность может стать решающей: возможна настоящая гонка за последними билетами. Это пример состояния гонки: какое место вы получите и получите ли вообще, зависит от порядка двух покупок.

При конкурентности состоянием гонки является всё, что зависит от порядка выполнения операций в двух и более потоках относительно друг друга: потоки участвуют в гонке по выполнению соответствующих операций. В стандарте C++ также определяется понятие гонки за данными, обозначающее конкретный тип состояния гонки, возникающий из-за одновременного изменения одного и того же объекта. Гонки за данными вызывают опасное неопределенное поведение.

Ошибки в состоянии гонки возникают, когда для завершения операции требуется выполнение нескольких инструкций процессора. Состояние гонки зачастую трудно определить и сложно воспроизвести. Обычно, вероятность, что между последовательно выполняемыми инструкциями вклинится другой поток, невелика. По мере увеличения нагрузки на систему и количества выполнения операции возрастает и вероятность возникновения проблемной последовательности выполнения. Проблемы практически неизменно появляются в самое неподходящее время. При запуске приложения под отладчиком проблема вообще может исчезать, поскольку отладчик влияет на выполнение программы.

Есть несколько способов, позволяющих справиться с проблемными состояниями гонок. Самый простой вариант – заключить структуру данных в механизм защиты, чтобы гарантировать, что промежуточные состояния, в которых нарушены инварианты, будут видны только потоку, выполняющему изменения. С позиции других потоков, обращающихся к этой же структуре данных, такие изменения либо еще не начнутся, либо уже завершатся. Стандартная библиотека C++ предоставляет несколько таких механизмов.

Есть еще один вариант – изменить конструкцию структуры данных и ее инвариантов так, чтобы модификации вносились в виде серии неделимых изменений, каждая из которых сохраняет инварианты. Обычно это называется программированием без блокировок (lock-free programming), и реализовать ее нелегко.

Простая защита данных с помощью мьютекса

std::mutex

Основным механизмом защиты совместно используемых данных, обеспеченным стандартом C++, является мьютекс.

Итак, имеется совместно используемая структура данных, например связный список, и его нужно защитить от состояния гонки и возможных нарушений инвариантов. Наверное, неплохо было бы получить возможность помечать все фрагменты кода, обращающиеся к структуре данных, как взаимоисключающие, чтобы при выполнении одного из них каким-либо потоком любой другой поток, пытающийся получить доступ к этой структуре данных, был бы вынужден ждать, пока первый поток не завершит выполнение такого фрагмента. Тогда поток не смог бы увидеть нарушенный инвариант, кроме тех случаев, когда он сам выполнял бы модификацию. Именно это будет получено при использовании примитива синхронизации под названием «мьютекс», означающего взаимное исключение (mutual exclusion). Перед получением доступа к совместно используемой структуре данных мьютекс, связанный с ней, блокируется, а когда доступ к ней заканчивается, блокировка с него снимается. Библиотека потоков гарантирует, что, как только один поток заблокирует определенный мьютекс, все остальные потоки, пытающиеся его заблокировать, должны будут ждать, пока поток, который успешно заблокировал мьютекс, его не разблокирует. Тем самым гарантируется, что все потоки видят непротиворечивое представление совместно используемых данных без нарушенных инвариантов. Мьютексы – главный механизм защиты данных, доступный в C++, но панацеей от всех бед их не назовешь: важно структурировать код таким образом, чтобы защитить нужные данные и избежать состояний гонки, присущих используемым интерфейсам. У мьютексов имеются и собственные проблемы в виде взаимной блокировки и защиты либо слишком большого, либо слишком малого объема данных.

Класс std::mutex – это примитив синхронизации, который может использоваться для защиты общих данных от одновременного доступа нескольких потоков.

std::mutex предлагает эксклюзивную, нерекурсивную семантику владения:

  • вызывающий поток владеет мьютексом с момента успешного вызова методов lock или try_lock до вызова unlock;
  • когда поток владеет мьютексом, все остальные потоки блокируются (при вызове lock) или получают false (при вызове try_lock), если они пытаются претендовать на владение мьютексом;
  • вызывающий поток не должен владеть мьютексом до вызова lock или try_lock.

Поведение программы не определено, если мьютекс уничтожается, всё еще будучи заблокированным, или если поток завершается, не разблокировав мьютекс.

std::mutex не является ни копируемым, ни перемещаемым.

Если метод lock вызывается потоком, который уже владеет мьютексом, поведение не определено: например, программа может попасть в deadlock.

Метод try_lock может ошибаться и возвращать false, даже если мьютекс в данный момент не заблокирован никаким другим потоком.

Если try_lock вызывается потоком, который уже владеет мьютексом, поведение не определено.

Мьютекс должен быть разблокирован тем потоком выполнения, который его заблокировал, в противном случае поведение не определено.

std::mutex обычно не захватывается напрямую, поскольку при этом нужно помнить о необходимости вызова unlock() на всех путях выхода из функции, в том числе возникающих из-за выдачи исключений. Стандартной библиотекой C++ предоставляются классы std::unique_lock, std::lock_guard или std::scoped_lock (начиная с C++17) для более безопасного управления захватом мьютексов.

Мьютекс является объектом операционной системы, поэтому для работы с ним через API ОС, можно получить handle с помощью метода native_handle.

Пример использования мьютекса:

#include <iostream>
#include <map>
#include <string>
#include <chrono>
#include <thread>
#include <mutex>
 
std::map<std::string, std::string> g_pages;
std::mutex g_pages_mutex;
 
void save_page(const std::string &url)
{
    // имитация получения длинной страницы
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::string result = "fake content";
 
    std::lock_guard<std::mutex> guard(g_pages_mutex);
    g_pages[url] = result;
}
 
int main() 
{
    std::thread t1(save_page, "http://foo");
    std::thread t2(save_page, "http://bar");
    t1.join();
    t2.join();
 
    // доступ к g_pages без блокировки теперь безопасен,
    // поскольку потоки присоединены
    for (const auto &pair : g_pages) {
        std::cout << pair.first << " => " << pair.second << '\n';
    }
}

В примере выше используются глобальные переменные для структуры данных и мьютекса. Иногда в таком использовании глобальных переменных есть определенный смысл, однако в большинстве случаев мьютекс и защищенные данные помещаются в один класс, а не в глобальные переменные. Это соответствует стандартным правилам объектно-ориентированного проектирования: помещение их в один класс служит признаком связанности друг с другом, позволяя инкапсулировать функциональность и обеспечить защиту. В данном случае save_page станет методом класса, а мьютекс и защищаемые данные – закрытыми членами класса, что значительно упростит определение того, какой код имеет доступ к данным, и, следовательно, какой код должен заблокировать мьютекс. Если все методы класса блокируют мьютекс перед доступом к защищаемым данным и разблокируют его по завершении доступа, данные будут надежно защищены от любого обращающегося к ним кода. Однако, это не всегда так: если один из методов класса возвращает указатель или ссылку на защищаемые данные, то в защите будет проделана большая дыра. Теперь обратиться к защищенным данным и, возможно, их изменить, не блокируя мьютекс, сможет любой код, имеющий доступ к этому указателю или ссылке. Поэтому защита данных с помощью мьютекса требует тщательной проработки интерфейса. Помимо проверки того, что методы не возвращают указатели или ссылки вызывающему их коду, важно также убедиться, что они не передают эти указатели или ссылки тем функциям, которые вызываются ими и не контролируются вами. Такая передача не менее опасна: эти функции могут хранить указатель или ссылку в том месте, где их позже можно использовать без защиты, предоставляемой мьютексом. В этом смысле особенно опасны функции, которые предоставляются во время выполнения программы в виде аргументов или другим способом. К сожалению, помочь справиться с проблемой такого рода библиотека потоков C++ не в состоянии, задача блокировки нужного мьютекса для защиты данных возлагается на программиста. В то же время можно воспользоваться рекомендацией, которая поможет в подобных случаях: не передавайте указатели и ссылки на защищенные данные за пределы блокировки никаким способом: ни возвращая их из функции, ни сохраняя во внешне видимой памяти, ни передавая в качестве аргументов функциям, предоставленным пользователем.

Применение мьютекса или другого механизма для защиты совместно используемых данных не дает полной гарантии защищенности от состояния гонки. Рассмотрим структуру данных стека. Пусть над нашим стеком можно проводить следующие операции: можно поместить в стек новый элемент методом push(), извлечь элемент из стека методом pop(), прочитать верхний элемент с помощью top(), проверить, не является ли стек пустым, с помощью empty(), и прочитать количество элементов стека методом size().

#include <deque>
#include <cstddef>
template<typename T,typename Container=std::deque<T> >
class stack
{
public:
    explicit stack(const Container&);
    explicit stack(Container&& = Container());
    template <class Alloc> explicit stack(const Alloc&);
    template <class Alloc> stack(const Container&, const Alloc&);
    template <class Alloc> stack(Container&&, const Alloc&);
    template <class Alloc> stack(stack&&, const Alloc&);

    bool empty() const;
    size_t size() const;
    T& top();
    T const& top() const;
    void push(T const&);
    void push(T&&);
    void pop();
    void swap(stack&&);
};

Даже функция top() возвращает копию, а не ссылку, и внутренние данные защищены с помощью мьютекса, этот интерфейс всё равно не будет застрахован от возникновения гонки. Проблема в том, что полагаться на результаты работы функций empty() и size() нельзя. Хотя на момент вызова они, вероятно, и были достоверными, но после возврата из функции любой другой поток может обратиться к стеку и затолкнуть в него новые элементы (push()), либо забрать существующие (pop()), причем до того, как поток, вызывающий empty() или size(), сможет воспользоваться этой информацией.

Более безопасный вариант реализации стека с упрощённым интерфейсом:

#include <exception>
#include <stack>
#include <mutex>
#include <memory>

struct empty_stack: std::exception
{
    const char* what() const throw()
    {
        return "empty stack";
    }
    
};

template<typename T>
class threadsafe_stack
{
private:
    std::stack<T> data;
    mutable std::mutex m;
public:
    threadsafe_stack(){}
    threadsafe_stack(const threadsafe_stack& other)
    {
        std::lock_guard<std::mutex> lock(other.m);
        data=other.data;
    }
    threadsafe_stack& operator=(const threadsafe_stack&) = delete;

    void push(T new_value)
    {
        std::lock_guard<std::mutex> lock(m);
        data.push(new_value);
    }
    std::shared_ptr<T> pop()
    {
        std::lock_guard<std::mutex> lock(m);
        if(data.empty()) throw empty_stack();
        std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
        data.pop();
        return res;
    }
    void pop(T& value)
    {
        std::lock_guard<std::mutex> lock(m);
        if(data.empty()) throw empty_stack();
        value=data.top();
        data.pop();
    }
    bool empty() const
    {
        std::lock_guard<std::mutex> lock(m);
        return data.empty();
    }
};

int main()
{
    threadsafe_stack<int> si;
    si.push(5);
    si.pop();
    if(!si.empty())
    {
        int x;
        si.pop(x);
    }
}

std::timed_mutex

Класс timed_mutex – это примитив синхронизации, который может использоваться для защиты общих данных от одновременного доступа нескольких потоков.

Подобно мьютексу, timed_mutex предлагает эксклюзивную, нерекурсивную семантику владения. Кроме того, timed_mutex предоставляет возможность попытаться захватить timed_mutex с таймаутом с помощью методов try_lock_for() и try_lock_until().

Метод try_lock_for():

  • Пытается заблокировать мьютекс. Поток ожидает до тех пор, пока не истечет указанное время ожидания, или не будет получена блокировка, в зависимости от того, что наступит раньше. При успешном получении блокировки возвращает true, в противном случае возвращает false.
  • Если timeout_duration меньше или равно timeout_duration.zero(), то функция ведет себя как try_lock().
  • Эта функция может блокировать поток дольше, чем timeout_duration, из-за задержек в работе планировщика или конкуренции за ресурсы между потоками.
  • Стандарт рекомендует использовать steady_clock для измерения длительности. Если реализация использует вместо этого system_clock, время ожидания также может быть чувствительно к корректировке часов.
  • Как и в случае с try_lock(), этой функции разрешено ложно возвращать false, даже если мьютекс не был заблокирован каким-либо другим потоком в какой-то момент во время timeout_duration.
  • Если try_lock_for вызывается потоком, который уже владеет мьютексом, поведение не определено.

Пример:

#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <sstream>
 
std::mutex cout_mutex; // управление доступом к std::cout
std::timed_mutex mutex;
 
void job(int id) 
{
    using Ms = std::chrono::milliseconds;
    std::ostringstream stream;
 
    for (int i = 0; i < 3; ++i) {
        if (mutex.try_lock_for(Ms(100))) {
            stream << "success ";
            std::this_thread::sleep_for(Ms(100));
            mutex.unlock();
        } else {
            stream << "failed ";
        }
        std::this_thread::sleep_for(Ms(100));
    }
 
    std::lock_guard<std::mutex> lock(cout_mutex);
    std::cout << "[" << id << "] " << stream.str() << "\n";
}
 
int main() 
{
    std::vector<std::thread> threads;
    for (int i = 0; i < 4; ++i) {
        threads.emplace_back(job, i);
    }
 
    for (auto& i: threads) {
        i.join();
    }
}

/*
Возможный вывод:
[0] failed failed failed 
[3] failed failed success 
[2] failed success failed 
[1] success failed success
*/

Метод try_lock_until() работает так же, как try_lock_for(), но принимает std::chrono::time_point в качестве аргумента. Если timeout_time уже прошел, эта функция ведет себя как try_lock().

Пример:

#include <thread>
#include <iostream>
#include <chrono>
#include <mutex>
 
std::timed_mutex test_mutex;
 
void f()
{
    auto now=std::chrono::steady_clock::now();
    test_mutex.try_lock_until(now + std::chrono::seconds(10));
    std::cout << "hello world\n";
}
 
int main()
{
    std::lock_guard<std::timed_mutex> l(test_mutex);
    std::thread t(f);
    t.join();
}

Теги

C++ / CppMutex / Мьютексstd::mutexstd::threadstd::timed_mutexSTL / Standard Template Library / Стандартная библиотека шаблоновМногопоточность

На сайте работает сервис комментирования DISQUS, который позволяет вам оставлять комментарии на множестве сайтов, имея лишь один аккаунт на Disqus.com.

В случае комментирования в качестве гостя (без регистрации на disqus.com) для публикации комментария требуется время на премодерацию.