|
#pragma once |
|
|
|
#include <type_traits> |
|
#include <new> |
|
#include <utility> |
|
#include <algorithm> |
|
#include <atomic> |
|
#include <tuple> |
|
#include <thread> |
|
#include <chrono> |
|
#include <string> |
|
#include <cassert> |
|
|
|
#include "libipc/def.h" |
|
#include "libipc/shm.h" |
|
#include "libipc/rw_lock.h" |
|
|
|
#include "libipc/utility/log.h" |
|
#include "libipc/platform/detail.h" |
|
#include "libipc/circ/elem_def.h" |
|
|
|
namespace ipc { |
|
namespace detail { |
|
|
|
class queue_conn { |
|
protected: |
|
circ::cc_t connected_ = 0; |
|
shm::handle elems_h_; |
|
|
|
template <typename Elems> |
|
Elems* open(char const * name) { |
|
if (name == nullptr || name[0] == '\0') { |
|
ipc::error("fail open waiter: name is empty!\n"); |
|
return nullptr; |
|
} |
|
if (!elems_h_.acquire(name, sizeof(Elems))) { |
|
return nullptr; |
|
} |
|
auto elems = static_cast<Elems*>(elems_h_.get()); |
|
if (elems == nullptr) { |
|
ipc::error("fail acquire elems: %s\n", name); |
|
return nullptr; |
|
} |
|
elems->init(); |
|
return elems; |
|
} |
|
|
|
void close() { |
|
elems_h_.release(); |
|
} |
|
|
|
public: |
|
queue_conn() = default; |
|
queue_conn(const queue_conn&) = delete; |
|
queue_conn& operator=(const queue_conn&) = delete; |
|
|
|
bool connected() const noexcept { |
|
return connected_ != 0; |
|
} |
|
|
|
circ::cc_t connected_id() const noexcept { |
|
return connected_; |
|
} |
|
|
|
template <typename Elems> |
|
auto connect(Elems* elems) noexcept |
|
|
|
-> std::tuple<bool, bool, decltype(std::declval<Elems>().cursor())> { |
|
if (elems == nullptr) return {}; |
|
|
|
if (connected()) return {connected(), false, 0}; |
|
connected_ = elems->connect_receiver(); |
|
return {connected(), true, elems->cursor()}; |
|
} |
|
|
|
template <typename Elems> |
|
bool disconnect(Elems* elems) noexcept { |
|
if (elems == nullptr) return false; |
|
|
|
if (!connected()) return false; |
|
elems->disconnect_receiver(std::exchange(connected_, 0)); |
|
return true; |
|
} |
|
}; |
|
|
|
template <typename Elems> |
|
class queue_base : public queue_conn { |
|
using base_t = queue_conn; |
|
|
|
public: |
|
using elems_t = Elems; |
|
using policy_t = typename elems_t::policy_t; |
|
|
|
protected: |
|
elems_t * elems_ = nullptr; |
|
decltype(std::declval<elems_t>().cursor()) cursor_ = 0; |
|
bool sender_flag_ = false; |
|
|
|
public: |
|
using base_t::base_t; |
|
|
|
queue_base() = default; |
|
|
|
explicit queue_base(char const * name) |
|
: queue_base{} { |
|
elems_ = open<elems_t>(name); |
|
} |
|
|
|
explicit queue_base(elems_t * elems) noexcept |
|
: queue_base{} { |
|
assert(elems != nullptr); |
|
elems_ = elems; |
|
} |
|
|
|
~queue_base() { |
|
base_t::close(); |
|
} |
|
|
|
elems_t * elems() noexcept { return elems_; } |
|
elems_t const * elems() const noexcept { return elems_; } |
|
|
|
bool ready_sending() noexcept { |
|
if (elems_ == nullptr) return false; |
|
return sender_flag_ || (sender_flag_ = elems_->connect_sender()); |
|
} |
|
|
|
void shut_sending() noexcept { |
|
if (elems_ == nullptr) return; |
|
if (!sender_flag_) return; |
|
elems_->disconnect_sender(); |
|
} |
|
|
|
bool connect() noexcept { |
|
auto tp = base_t::connect(elems_); |
|
if (std::get<0>(tp) && std::get<1>(tp)) { |
|
cursor_ = std::get<2>(tp); |
|
return true; |
|
} |
|
return std::get<0>(tp); |
|
} |
|
|
|
bool disconnect() noexcept { |
|
return base_t::disconnect(elems_); |
|
} |
|
|
|
std::size_t conn_count() const noexcept { |
|
return (elems_ == nullptr) ? static_cast<std::size_t>(invalid_value) : elems_->conn_count(); |
|
} |
|
|
|
bool valid() const noexcept { |
|
return elems_ != nullptr; |
|
} |
|
|
|
bool empty() const noexcept { |
|
return !valid() || (cursor_ == elems_->cursor()); |
|
} |
|
|
|
template <typename T, typename F, typename... P> |
|
bool push(F&& prep, P&&... params) { |
|
if (elems_ == nullptr) return false; |
|
return elems_->push(this, [&](void* p) { |
|
if (prep(p)) ::new (p) T(std::forward<P>(params)...); |
|
}); |
|
} |
|
|
|
template <typename T, typename F, typename... P> |
|
bool force_push(F&& prep, P&&... params) { |
|
if (elems_ == nullptr) return false; |
|
return elems_->force_push(this, [&](void* p) { |
|
if (prep(p)) ::new (p) T(std::forward<P>(params)...); |
|
}); |
|
} |
|
|
|
template <typename T, typename F> |
|
bool pop(T& item, F&& out) { |
|
if (elems_ == nullptr) { |
|
return false; |
|
} |
|
return elems_->pop(this, &(this->cursor_), [&item](void* p) { |
|
::new (&item) T(std::move(*static_cast<T*>(p))); |
|
}, std::forward<F>(out)); |
|
} |
|
}; |
|
|
|
} |
|
|
|
template <typename T, typename Policy> |
|
class queue final : public detail::queue_base<typename Policy::template elems_t<sizeof(T), alignof(T)>> { |
|
using base_t = detail::queue_base<typename Policy::template elems_t<sizeof(T), alignof(T)>>; |
|
|
|
public: |
|
using value_t = T; |
|
|
|
using base_t::base_t; |
|
|
|
template <typename... P> |
|
bool push(P&&... params) { |
|
return base_t::template push<T>(std::forward<P>(params)...); |
|
} |
|
|
|
template <typename... P> |
|
bool force_push(P&&... params) { |
|
return base_t::template force_push<T>(std::forward<P>(params)...); |
|
} |
|
|
|
bool pop(T& item) { |
|
return base_t::pop(item, [](bool) {}); |
|
} |
|
|
|
template <typename F> |
|
bool pop(T& item, F&& out) { |
|
return base_t::pop(item, std::forward<F>(out)); |
|
} |
|
}; |
|
|
|
} |
|
|