Общий рекурсивный мьютекс в стандартном C++

Для C++17 запланирован класс shared_mutex. И shared_timed_mutex уже в C++14. (Кто знает, почему они пришли именно в таком порядке, но все равно.) Затем идет recursive_mutex и recursive_timed_mutex, начиная с C++11. Что мне нужно, так это shared_recursive_mutex. Я что-то пропустил в стандарте или мне нужно ждать стандартизированной версии еще три года?

Если в настоящее время такой возможности нет, что может быть простой (первый приоритет) и эффективной (второй приоритет) реализацией такой функции с использованием только стандартного C++?


person Ralph Tandetzky    schedule 14.04.2016    source источник
comment
Я бы не стал писать такое. как насчет того, чтобы не делать блокировку рекурсивной?   -  person David Haim    schedule 14.04.2016
comment
Необходимость рекурсивного мьютекса обычно является признаком того, что код необходимо переработать.   -  person Pete Becker    schedule 14.04.2016


Ответы (5)


Рекурсивное свойство мьютекса работает с термином владелец, который в случае shared_mutex определен нечетко: несколько потоков могут вызывать .lock_shared() в в то же время.

Предполагая, что владелец является потоком, который вызывает .lock() (не .lock_shared()!), реализация рекурсивного общего мьютекса может быть просто получена из shared_mutex:

class shared_recursive_mutex: public shared_mutex
{
public:
    void lock(void) {
        std::thread::id this_id = std::this_thread::get_id();
        if(owner == this_id) {
            // recursive locking
            count++;
        }
        else {
            // normal locking
            shared_mutex::lock();
            owner = this_id;
            count = 1;
        }
    }
    void unlock(void) {
        if(count > 1) {
            // recursive unlocking
            count--;
        }
        else {
            // normal unlocking
            owner = std::thread::id();
            count = 0;
            shared_mutex::unlock();
        }
    }

private:
    std::atomic<std::thread::id> owner;
    int count;
};

Поле .owner нужно объявить атомарным, так как в методе .lock() оно проверяется без защиты от параллельного доступа.

Если вы хотите рекурсивно вызывать метод .lock_shared(), вам необходимо поддерживать карту владельцев, а доступ к этой карте должен быть защищен дополнительным мьютексом.

Разрешение потоку с активным .lock() вызывать .lock_shared() усложняет реализацию.

Наконец, разрешение потоку перемещать блокировку от .lock_shared() к .lock() является нет-нет, так как это приводит к возможной взаимоблокировке, когда два потока пытаются выполнить это продвижение.


Опять же, семантика рекурсивного общего мьютекса будет очень хрупкой, поэтому лучше вообще его не использовать.

person Tsyvarev    schedule 14.04.2016
comment
Хм, если lock_shared вызывается потоком, который уже владеет мьютексом в любом режиме (монопольном или совместно используемом), поведение не определено. -- это раздражает. - person Yakk - Adam Nevraumont; 20.04.2016
comment
@Yakk: указано для обычного shared_mutex. Свойство Recursive определенно изменит семантику мьютекса, поэтому мьютекс будет иметь другие требования. - person Tsyvarev; 20.04.2016

Если вы работаете на платформе Linux/POSIX, вам повезло, потому что мьютексы C++ созданы по образцу мьютексов POSIX. POSIX предоставляет больше возможностей, в том числе рекурсивность, совместное использование процессов и многое другое. Оборачивать примитивы POSIX в классы C++ несложно.

Хорошая точка входа в документацию по потокам POSIX.

person Maxim Egorushkin    schedule 14.04.2016
comment
такой функции, используя только стандартный C++? вы упускаете суть. вы можете написать что угодно с помощью собственного API, но OP явно хочет чего-то стандартного - person David Haim; 14.04.2016
comment
@DavidHaim К сожалению, мьютексы требуют взаимодействия с ОС, чтобы поместить поток в очередь ожидания мьютексов, которая поддерживается ядром. - person Maxim Egorushkin; 14.04.2016
comment
так что это должен был быть ваш ответ вместо этого, вы не можете - person David Haim; 14.04.2016
comment
@DavidHaim Было бы непрактично игнорировать функции ОС. Я бы предпочел иметь один и тот же интерфейс с реализациями для разных ОС. - person Maxim Egorushkin; 14.04.2016
comment
@DavidHaim но оператор явно хочет чего-то стандартного POSIX является стандартным , по общему признанию, не относящийся к C++. - person Andrew Henle; 14.04.2016
comment
то же самое касается тебя - person David Haim; 14.04.2016
comment
А теперь мальчики, ведите себя. Дэвид, я согласен, что это не совсем то, о чем просил Ральф, но это может быть ответом, который ему нужен. Я ответил на него более буквально, но я сомневаюсь, что это на самом деле более полезно. - person Useless; 14.04.2016

Вот быстрая обертка для обеспечения безопасности потоков вокруг типа T:

template<class T, class Lock>
struct lock_guarded {
  Lock l;
  T* t;
  T* operator->()&&{ return t; }
  template<class Arg>
  auto operator[](Arg&&arg)&&
  -> decltype(std::declval<T&>()[std::declval<Arg>()])
  {
    return (*t)[std::forward<Arg>(arg)];
  }
  T& operator*()&&{ return *t; }
};
constexpr struct emplace_t {} emplace {};
template<class T>
struct mutex_guarded {
  lock_guarded<T, std::unique_lock<std::mutex>>
  get_locked() {
    return {{m},&t};
  }
  lock_guarded<T const, std::unique_lock<std::mutex>>
  get_locked() const {
    return {{m},&t};
  }
  lock_guarded<T, std::unique_lock<std::mutex>>
  operator->() {
    return get_locked();
  }
  lock_guarded<T const, std::unique_lock<std::mutex>>
  operator->() const {
    return get_locked();
  }
  template<class F>
  std::result_of_t<F(T&)>
  operator->*(F&& f) {
    return std::forward<F>(f)(*get_locked());
  }
  template<class F>
  std::result_of_t<F(T const&)>
  operator->*(F&& f) const {
    return std::forward<F>(f)(*get_locked());
  }
  template<class...Args>
  mutex_guarded(emplace_t, Args&&...args):
    t(std::forward<Args>(args)...)
  {}
  mutex_guarded(mutex_guarded&& o):
    t( std::move(*o.get_locked()) )
  {}
  mutex_guarded(mutex_guarded const& o):
    t( *o.get_locked() )
  {}
  mutex_guarded() = default;
  ~mutex_guarded() = default;
  mutex_guarded& operator=(mutex_guarded&& o)
  {
    T tmp = std::move(o.get_locked());
    *get_locked() = std::move(tmp);
    return *this;
  }
  mutex_guarded& operator=(mutex_guarded const& o):
  {
    T tmp = o.get_locked();
    *get_locked() = std::move(tmp);
    return *this;
  }

private:
  std::mutex m;
  T t;
};

Вы можете использовать:

mutex_guarded<std::vector<int>> guarded;
auto s0 = guarded->size();
auto s1 = guarded->*[](auto&&e){return e.size();};

оба делают примерно одно и то же, и доступ к охраняемому объекту возможен только тогда, когда мьютекс заблокирован.

Воруя из ответа @tsyvarev (с небольшими изменениями), мы получаем:

class shared_recursive_mutex
{
  std::shared_mutex m
public:
  void lock(void) {
    std::thread::id this_id = std::this_thread::get_id();
    if(owner == this_id) {
      // recursive locking
      ++count;
    } else {
      // normal locking
      m.lock();
      owner = this_id;
      count = 1;
    }
  }
  void unlock(void) {
    if(count > 1) {
      // recursive unlocking
      count--;
    } else {
      // normal unlocking
      owner = std::thread::id();
      count = 0;
      m.unlock();
    }
  }
  void lock_shared() {
    std::thread::id this_id = std::this_thread::get_id();
    if (shared_counts->count(this_id)) {
      ++(shared_count.get_locked()[this_id]);
    } else {
      m.lock_shared();
      shared_count.get_locked()[this_id] = 1;
    }
  }
  void unlock_shared() {
    std::thread::id this_id = std::this_thread::get_id();
    auto it = shared_count->find(this_id);
    if (it->second > 1) {
      --(it->second);
    } else {
      shared_count->erase(it);
      m.unlock_shared();
    }
  }
private:
  std::atomic<std::thread::id> owner;
  std::atomic<std::size_t> count;
  mutex_guarded<std::map<std::thread::id, std::size_t>> shared_counts;
};

try_lock и try_lock_shared оставили в качестве упражнения.

И блокировка, и разблокировка совместно блокируют мьютекс дважды (это безопасно, так как ветки действительно о том, «контролирует ли этот поток мьютекс», и другой поток не может изменить этот ответ с «нет» на «да» или наоборот) . Вы могли бы сделать это с одной блокировкой с ->* вместо ->, что сделало бы это быстрее (за счет некоторой сложности в логике).


Вышеупомянутое не поддерживает монопольную блокировку, а затем совместную блокировку. Это сложно. Он не может поддерживать общую блокировку, а затем обновление до уникальной блокировки, потому что в принципе невозможно предотвратить взаимоблокировку, когда 2 потока пытаются это сделать.

Эта последняя проблема может быть причиной того, почему рекурсивные общие мьютексы — плохая идея.

person Yakk - Adam Nevraumont    schedule 20.04.2016

Возможно создать общий рекурсивный мьютекс, используя существующие примитивы. Однако я не рекомендую этого делать.

Это не просто, и обертка существующей реализации POSIX (или любой другой, родной для вашей платформы), скорее всего, будет более эффективной.

Если вы решите написать собственную реализацию, ее эффективность по-прежнему будет зависеть от особенностей конкретной платформы, поэтому вы либо пишете интерфейс с разными реализациями для каждой платформы, либо выбираете платформу и могли бы так же легко использовать собственные средства (POSIX или что-то еще).

Я, конечно, не собираюсь приводить пример реализации рекурсивной блокировки чтения/записи, потому что это совершенно неразумный объем работы для ответа на вопрос о переполнении стека.

person Useless    schedule 14.04.2016

Делюсь своей реализацией, никаких обещаний

recursive_shared_mutex.h

#ifndef _RECURSIVE_SHARED_MUTEX_H
#define _RECURSIVE_SHARED_MUTEX_H

#include <thread>
#include <mutex>
#include <map>

struct recursive_shared_mutex
{
public:

    recursive_shared_mutex() :
        m_mtx{}, m_exclusive_thread_id{}, m_exclusive_count{ 0 }, m_shared_locks{}
    {}

    void lock();
    bool try_lock();
    void unlock();

    void lock_shared();
    bool try_lock_shared();
    void unlock_shared();

    recursive_shared_mutex(const recursive_shared_mutex&) = delete;
    recursive_shared_mutex& operator=(const recursive_shared_mutex&) = delete;

private:

    inline bool is_exclusive_locked()
    {
        return m_exclusive_count > 0;
    }

    inline bool is_shared_locked()
    {
        return m_shared_locks.size() > 0;
    }

    inline bool can_exclusively_lock()
    {
        return can_start_exclusive_lock() || can_increment_exclusive_lock();
    }

    inline bool can_start_exclusive_lock()
    {
        return !is_exclusive_locked() && (!is_shared_locked() || is_shared_locked_only_on_this_thread());
    }

    inline bool can_increment_exclusive_lock()
    {
        return is_exclusive_locked_on_this_thread();
    }

    inline bool can_lock_shared()
    {
        return !is_exclusive_locked() || is_exclusive_locked_on_this_thread();
    }

    inline bool is_shared_locked_only_on_this_thread()
    {
        return is_shared_locked_only_on_thread(std::this_thread::get_id());
    }

    inline bool is_shared_locked_only_on_thread(std::thread::id id)
    {
        return m_shared_locks.size() == 1 && m_shared_locks.find(id) != m_shared_locks.end();
    }

    inline bool is_exclusive_locked_on_this_thread()
    {
        return is_exclusive_locked_on_thread(std::this_thread::get_id());
    }

    inline bool is_exclusive_locked_on_thread(std::thread::id id)
    {
        return m_exclusive_count > 0 && m_exclusive_thread_id == id;
    }

    inline void start_exclusive_lock()
    {
        m_exclusive_thread_id = std::this_thread::get_id();
        m_exclusive_count++;
    }

    inline void increment_exclusive_lock()
    {
        m_exclusive_count++;
    }

    inline void decrement_exclusive_lock()
    {
        if (m_exclusive_count == 0)
        {
            throw std::logic_error("Not exclusively locked, cannot exclusively unlock");
        }
        if (m_exclusive_thread_id == std::this_thread::get_id())
        {
            m_exclusive_count--;
        }
        else
        {
            throw std::logic_error("Calling exclusively unlock from the wrong thread");
        }
    }

    inline void increment_shared_lock()
    {
        increment_shared_lock(std::this_thread::get_id());
    }

    inline void increment_shared_lock(std::thread::id id)
    {
        if (m_shared_locks.find(id) == m_shared_locks.end())
        {
            m_shared_locks[id] = 1;
        }
        else
        {
            m_shared_locks[id] += 1;
        }
    }

    inline void decrement_shared_lock()
    {
        decrement_shared_lock(std::this_thread::get_id());
    }

    inline void decrement_shared_lock(std::thread::id id)
    {
        if (m_shared_locks.size() == 0)
        {
            throw std::logic_error("Not shared locked, cannot shared unlock");
        }
        if (m_shared_locks.find(id) == m_shared_locks.end())
        {
            throw std::logic_error("Calling shared unlock from the wrong thread");
        }
        else
        {
            if (m_shared_locks[id] == 1)
            {
                m_shared_locks.erase(id);
            }
            else
            {
                m_shared_locks[id] -= 1;
            }
        }
    }

    std::mutex m_mtx;
    std::thread::id m_exclusive_thread_id;
    size_t m_exclusive_count;
    std::map<std::thread::id, size_t> m_shared_locks;
    std::condition_variable m_cond_var;
};

#endif

recursive_shared_mutex.cpp

#include "recursive_shared_mutex.h"
#include <condition_variable>

void recursive_shared_mutex::lock()
{
    std::unique_lock sync_lock(m_mtx);
    m_cond_var.wait(sync_lock, [this] { return can_exclusively_lock(); });
    if (is_exclusive_locked_on_this_thread())
    {
        increment_exclusive_lock();
    }
    else
    {
        start_exclusive_lock();
    }
}

bool recursive_shared_mutex::try_lock()
{
    std::unique_lock sync_lock(m_mtx);
    if (can_increment_exclusive_lock())
    {
        increment_exclusive_lock();
        return true;
    }
    if (can_start_exclusive_lock())
    {
        start_exclusive_lock();
        return true;
    }
    return false;
}

void recursive_shared_mutex::unlock()
{
    {
        std::unique_lock sync_lock(m_mtx);
        decrement_exclusive_lock();
    }
    m_cond_var.notify_all();
}

void recursive_shared_mutex::lock_shared()
{
    std::unique_lock sync_lock(m_mtx);
    m_cond_var.wait(sync_lock, [this] { return can_lock_shared(); });
    increment_shared_lock();
}

bool recursive_shared_mutex::try_lock_shared()
{
    std::unique_lock sync_lock(m_mtx);
    if (can_lock_shared())
    {
        increment_shared_lock();
        return true;
    }
    return false;
}

void recursive_shared_mutex::unlock_shared()
{
    {
        std::unique_lock sync_lock(m_mtx);
        decrement_shared_lock();
    }
    m_cond_var.notify_all();
}

Если поток владеет общей блокировкой, он также может получить эксклюзивную блокировку, не отказываясь от своей общей блокировки. (Это, конечно, требует, чтобы ни один другой поток в настоящее время не имел общей или монопольной блокировки)

И наоборот, поток, владеющий эксклюзивной блокировкой, может получить общую блокировку.

Интересно, что эти свойства также позволяют обновлять/откатывать замки.

Временное обновление замка:

recusrive_shared_mutex mtx;
foo bar;

mtx.lock_shared();
if (bar.read() == x)
{
    mtx.lock();
    bar.write(y);
    mtx.unlock();
}
mtx.unlock_shared();

Переход с эксклюзивной блокировки на общую блокировку

recusrive_shared_mutex mtx;
foo bar;

mtx.lock();
bar.write(x);
mtx.lock_shared();
mtx.unlock();
while (bar.read() != y)
{
     // Something
}
mtx.unlock_shared();
person pix64    schedule 03.02.2020