#include #include #include #include // std::pair, std::move, std::forward #include #include // aligned_storage_t #include #include #include #include #include "libipc/ipc.h" #include "libipc/def.h" #include "libipc/shm.h" #include "libipc/pool_alloc.h" #include "libipc/queue.h" #include "libipc/policy.h" #include "libipc/rw_lock.h" #include "libipc/waiter.h" #include "libipc/utility/log.h" #include "libipc/utility/id_pool.h" #include "libipc/utility/scope_guard.h" #include "libipc/utility/utility.h" #include "libipc/memory/resource.h" #include "libipc/platform/detail.h" #include "libipc/circ/elem_array.h" namespace { using msg_id_t = std::uint32_t; using acc_t = std::atomic; template struct msg_t; template struct msg_t<0, AlignSize> { msg_id_t cc_id_; msg_id_t id_; std::int32_t remain_; bool storage_; }; template struct msg_t : msg_t<0, AlignSize> { std::aligned_storage_t data_ {}; msg_t() = default; msg_t(msg_id_t cc_id, msg_id_t id, std::int32_t remain, void const * data, std::size_t size) : msg_t<0, AlignSize> {cc_id, id, remain, (data == nullptr) || (size == 0)} { if (this->storage_) { if (data != nullptr) { // copy storage-id *reinterpret_cast(&data_) = *static_cast(data); } } else std::memcpy(&data_, data, size); } }; template ipc::buff_t make_cache(T& data, std::size_t size) { auto ptr = ipc::mem::alloc(size); std::memcpy(ptr, &data, (ipc::detail::min)(sizeof(data), size)); return { ptr, size, ipc::mem::free }; } struct cache_t { std::size_t fill_; ipc::buff_t buff_; cache_t(std::size_t f, ipc::buff_t && b) : fill_(f), buff_(std::move(b)) {} void append(void const * data, std::size_t size) { if (fill_ >= buff_.size() || data == nullptr || size == 0) return; auto new_fill = (ipc::detail::min)(fill_ + size, buff_.size()); std::memcpy(static_cast(buff_.data()) + fill_, data, new_fill - fill_); fill_ = new_fill; } }; auto cc_acc() { static ipc::shm::handle acc_h("__CA_CONN__", sizeof(acc_t)); return static_cast(acc_h.get()); } IPC_CONSTEXPR_ std::size_t align_chunk_size(std::size_t size) noexcept { return (((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align; } IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept { return ipc::make_align(alignof(std::max_align_t), align_chunk_size( ipc::make_align(alignof(std::max_align_t), sizeof(std::atomic)) + size)); } struct chunk_t { std::atomic &conns() noexcept { return *reinterpret_cast *>(this); } void *data() noexcept { return reinterpret_cast(this) + ipc::make_align(alignof(std::max_align_t), sizeof(std::atomic)); } }; struct chunk_info_t { ipc::id_pool<> pool_; ipc::spin_lock lock_; IPC_CONSTEXPR_ static std::size_t chunks_mem_size(std::size_t chunk_size) noexcept { return ipc::id_pool<>::max_count * chunk_size; } ipc::byte_t *chunks_mem() noexcept { return reinterpret_cast(this + 1); } chunk_t *at(std::size_t chunk_size, ipc::storage_id_t id) noexcept { if (id < 0) return nullptr; return reinterpret_cast(chunks_mem() + (chunk_size * id)); } }; auto& chunk_storages() { class chunk_handle_t { ipc::shm::handle handle_; public: chunk_info_t *get_info(std::size_t chunk_size) { if (!handle_.valid() && !handle_.acquire( ("__CHUNK_INFO__" + ipc::to_string(chunk_size)).c_str(), sizeof(chunk_info_t) + chunk_info_t::chunks_mem_size(chunk_size) )) { ipc::error("[chunk_storages] chunk_shm.id_info_.acquire failed: chunk_size = %zd\n", chunk_size); return nullptr; } auto info = static_cast(handle_.get()); if (info == nullptr) { ipc::error("[chunk_storages] chunk_shm.id_info_.get failed: chunk_size = %zd\n", chunk_size); return nullptr; } return info; } }; static ipc::map chunk_hs; return chunk_hs; } chunk_info_t *chunk_storage_info(std::size_t chunk_size) { auto &storages = chunk_storages(); std::decay_t::iterator it; { static ipc::rw_lock lock; IPC_UNUSED_ std::shared_lock guard {lock}; if ((it = storages.find(chunk_size)) == storages.end()) { using chunk_handle_t = std::decay_t::value_type::second_type; guard.unlock(); IPC_UNUSED_ std::lock_guard guard {lock}; it = storages.emplace(chunk_size, chunk_handle_t{}).first; } } return it->second.get_info(chunk_size); } std::pair acquire_storage(std::size_t size, ipc::circ::cc_t conns) { std::size_t chunk_size = calc_chunk_size(size); auto info = chunk_storage_info(chunk_size); if (info == nullptr) return {}; info->lock_.lock(); info->pool_.prepare(); // got an unique id auto id = info->pool_.acquire(); info->lock_.unlock(); auto chunk = info->at(chunk_size, id); if (chunk == nullptr) return {}; chunk->conns().store(conns, std::memory_order_relaxed); return { id, chunk->data() }; } void *find_storage(ipc::storage_id_t id, std::size_t size) { if (id < 0) { ipc::error("[find_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return nullptr; } std::size_t chunk_size = calc_chunk_size(size); auto info = chunk_storage_info(chunk_size); if (info == nullptr) return nullptr; return info->at(chunk_size, id)->data(); } void release_storage(ipc::storage_id_t id, std::size_t size) { if (id < 0) { ipc::error("[release_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return; } std::size_t chunk_size = calc_chunk_size(size); auto info = chunk_storage_info(chunk_size); if (info == nullptr) return; info->lock_.lock(); info->pool_.release(id); info->lock_.unlock(); } template bool sub_rc(ipc::wr, std::atomic &/*conns*/, ipc::circ::cc_t /*curr_conns*/, ipc::circ::cc_t /*conn_id*/) noexcept { return true; } template bool sub_rc(ipc::wr, std::atomic &conns, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) noexcept { auto last_conns = curr_conns & ~conn_id; for (unsigned k = 0;;) { auto chunk_conns = conns.load(std::memory_order_acquire); if (conns.compare_exchange_weak(chunk_conns, chunk_conns & last_conns, std::memory_order_release)) { return (chunk_conns & last_conns) == 0; } ipc::yield(k); } } template void recycle_storage(ipc::storage_id_t id, std::size_t size, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) { if (id < 0) { ipc::error("[recycle_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return; } std::size_t chunk_size = calc_chunk_size(size); auto info = chunk_storage_info(chunk_size); if (info == nullptr) return; auto chunk = info->at(chunk_size, id); if (chunk == nullptr) return; if (!sub_rc(Flag{}, chunk->conns(), curr_conns, conn_id)) { return; } info->lock_.lock(); info->pool_.release(id); info->lock_.unlock(); } template bool clear_message(void* p) { auto msg = static_cast(p); if (msg->storage_) { std::int32_t r_size = static_cast(ipc::data_length) + msg->remain_; if (r_size <= 0) { ipc::error("[clear_message] invalid msg size: %d\n", (int)r_size); return true; } release_storage( *reinterpret_cast(&msg->data_), static_cast(r_size)); } return true; } struct conn_info_head { ipc::string name_; msg_id_t cc_id_; // connection-info id ipc::detail::waiter cc_waiter_, wt_waiter_, rd_waiter_; ipc::shm::handle acc_h_; conn_info_head(char const * name) : name_ {name} , cc_id_ {(cc_acc() == nullptr) ? 0 : cc_acc()->fetch_add(1, std::memory_order_relaxed)} , cc_waiter_{("__CC_CONN__" + name_).c_str()} , wt_waiter_{("__WT_CONN__" + name_).c_str()} , rd_waiter_{("__RD_CONN__" + name_).c_str()} , acc_h_ {("__AC_CONN__" + name_).c_str(), sizeof(acc_t)} { } void quit_waiting() { cc_waiter_.quit_waiting(); wt_waiter_.quit_waiting(); rd_waiter_.quit_waiting(); } auto acc() { return static_cast(acc_h_.get()); } auto& recv_cache() { thread_local ipc::unordered_map tls; return tls; } }; template bool wait_for(W& waiter, F&& pred, std::uint64_t tm) { if (tm == 0) return !pred(); for (unsigned k = 0; pred();) { bool ret = true; ipc::sleep(k, [&k, &ret, &waiter, &pred, tm] { ret = waiter.wait_if(std::forward(pred), tm); k = 0; }); if (!ret) return false; // timeout or fail if (k == 0) break; // k has been reset } return true; } template struct queue_generator { using queue_t = ipc::queue, Policy>; struct conn_info_t : conn_info_head { queue_t que_; conn_info_t(char const * name) : conn_info_head{name} , que_{("__QU_CONN__" + ipc::to_string(DataSize) + "__" + ipc::to_string(AlignSize) + "__" + name).c_str()} { } void disconnect_receiver() { bool dis = que_.disconnect(); this->quit_waiting(); if (dis) { this->recv_cache().clear(); } } }; }; template struct detail_impl { using policy_t = Policy; using flag_t = typename policy_t::flag_t; using queue_t = typename queue_generator::queue_t; using conn_info_t = typename queue_generator::conn_info_t; constexpr static conn_info_t* info_of(ipc::handle_t h) noexcept { return static_cast(h); } constexpr static queue_t* queue_of(ipc::handle_t h) noexcept { return (info_of(h) == nullptr) ? nullptr : &(info_of(h)->que_); } /* API implementations */ static void disconnect(ipc::handle_t h) { auto que = queue_of(h); if (que == nullptr) { return; } que->shut_sending(); assert(info_of(h) != nullptr); info_of(h)->disconnect_receiver(); } static bool reconnect(ipc::handle_t * ph, bool start_to_recv) { assert(ph != nullptr); assert(*ph != nullptr); auto que = queue_of(*ph); if (que == nullptr) { return false; } if (start_to_recv) { que->shut_sending(); if (que->connect()) { // wouldn't connect twice info_of(*ph)->cc_waiter_.broadcast(); return true; } return false; } // start_to_recv == false if (que->connected()) { info_of(*ph)->disconnect_receiver(); } return que->ready_sending(); } static bool connect(ipc::handle_t * ph, char const * name, bool start_to_recv) { assert(ph != nullptr); if (*ph == nullptr) { *ph = ipc::mem::alloc(name); } return reconnect(ph, start_to_recv); } static void destroy(ipc::handle_t h) { disconnect(h); ipc::mem::free(info_of(h)); } static std::size_t recv_count(ipc::handle_t h) noexcept { auto que = queue_of(h); if (que == nullptr) { return ipc::invalid_value; } return que->conn_count(); } static bool wait_for_recv(ipc::handle_t h, std::size_t r_count, std::uint64_t tm) { auto que = queue_of(h); if (que == nullptr) { return false; } return wait_for(info_of(h)->cc_waiter_, [que, r_count] { return que->conn_count() < r_count; }, tm); } template static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t size) { if (data == nullptr || size == 0) { ipc::error("fail: send(%p, %zd)\n", data, size); return false; } auto que = queue_of(h); if (que == nullptr) { ipc::error("fail: send, queue_of(h) == nullptr\n"); return false; } if (que->elems() == nullptr) { ipc::error("fail: send, queue_of(h)->elems() == nullptr\n"); return false; } if (!que->ready_sending()) { ipc::error("fail: send, que->ready_sending() == false\n"); return false; } ipc::circ::cc_t conns = que->elems()->connections(std::memory_order_relaxed); if (conns == 0) { ipc::error("fail: send, there is no receiver on this connection.\n"); return false; } // calc a new message id auto acc = info_of(h)->acc(); if (acc == nullptr) { ipc::error("fail: send, info_of(h)->acc() == nullptr\n"); return false; } auto msg_id = acc->fetch_add(1, std::memory_order_relaxed); auto try_push = std::forward(gen_push)(info_of(h), que, msg_id); if (size > ipc::large_msg_limit) { auto dat = acquire_storage(size, conns); void * buf = dat.second; if (buf != nullptr) { std::memcpy(buf, data, size); return try_push(static_cast(size) - static_cast(ipc::data_length), &(dat.first), 0); } // try using message fragment //ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size); } // push message fragment std::int32_t offset = 0; for (std::int32_t i = 0; i < static_cast(size / ipc::data_length); ++i, offset += ipc::data_length) { if (!try_push(static_cast(size) - offset - static_cast(ipc::data_length), static_cast(data) + offset, ipc::data_length)) { return false; } } // if remain > 0, this is the last message fragment std::int32_t remain = static_cast(size) - offset; if (remain > 0) { if (!try_push(remain - static_cast(ipc::data_length), static_cast(data) + offset, static_cast(remain))) { return false; } } return true; } static bool send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) { return send([tm](auto info, auto que, auto msg_id) { return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { return !que->push( [](void*) { return true; }, info->cc_id_, msg_id, remain, data, size); }, tm)) { ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size); if (!que->force_push( clear_message, info->cc_id_, msg_id, remain, data, size)) { return false; } } info->rd_waiter_.broadcast(); return true; }; }, h, data, size); } static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) { return send([tm](auto info, auto que, auto msg_id) { return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { return !que->push( [](void*) { return true; }, info->cc_id_, msg_id, remain, data, size); }, tm)) { return false; } info->rd_waiter_.broadcast(); return true; }; }, h, data, size); } static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) { auto que = queue_of(h); if (que == nullptr) { ipc::error("fail: recv, queue_of(h) == nullptr\n"); return {}; } if (!que->connected()) { // hasn't connected yet, just return. return {}; } auto& rc = info_of(h)->recv_cache(); for (;;) { // pop a new message typename queue_t::value_t msg; if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { return !que->pop(msg); }, tm)) { // pop failed, just return. return {}; } info_of(h)->wt_waiter_.broadcast(); if ((info_of(h)->acc() != nullptr) && (msg.cc_id_ == info_of(h)->cc_id_)) { continue; // ignore message to self } // msg.remain_ may minus & abs(msg.remain_) < data_length std::int32_t r_size = static_cast(ipc::data_length) + msg.remain_; if (r_size <= 0) { ipc::error("fail: recv, r_size = %d\n", (int)r_size); return {}; } std::size_t msg_size = static_cast(r_size); // large message if (msg.storage_) { ipc::storage_id_t buf_id = *reinterpret_cast(&msg.data_); void* buf = find_storage(buf_id, msg_size); if (buf != nullptr) { struct recycle_t { ipc::storage_id_t storage_id; ipc::circ::cc_t curr_conns; ipc::circ::cc_t conn_id; } *r_info = ipc::mem::alloc(recycle_t{ buf_id, que->elems()->connections(std::memory_order_relaxed), que->connected_id() }); if (r_info == nullptr) { ipc::log("fail: ipc::mem::alloc.\n"); return ipc::buff_t{buf, msg_size}; // no recycle } else { return ipc::buff_t{buf, msg_size, [](void* p_info, std::size_t size) { auto r_info = static_cast(p_info); IPC_UNUSED_ auto finally = ipc::guard([r_info] { ipc::mem::free(r_info); }); recycle_storage(r_info->storage_id, size, r_info->curr_conns, r_info->conn_id); }, r_info}; } } else { ipc::log("fail: shm::handle for large message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size); continue; } } // find cache with msg.id_ auto cac_it = rc.find(msg.id_); if (cac_it == rc.end()) { if (msg_size <= ipc::data_length) { return make_cache(msg.data_, msg_size); } // gc if (rc.size() > 1024) { std::vector need_del; for (auto const & pair : rc) { auto cmp = std::minmax(msg.id_, pair.first); if (cmp.second - cmp.first > 8192) { need_del.push_back(pair.first); } } for (auto id : need_del) rc.erase(id); } // cache the first message fragment rc.emplace(msg.id_, cache_t { ipc::data_length, make_cache(msg.data_, msg_size) }); } // has cached before this message else { auto& cac = cac_it->second; // this is the last message fragment if (msg.remain_ <= 0) { cac.append(&(msg.data_), msg_size); // finish this message, erase it from cache auto buff = std::move(cac.buff_); rc.erase(cac_it); return buff; } // there are remain datas after this message cac.append(&(msg.data_), ipc::data_length); } } } static ipc::buff_t try_recv(ipc::handle_t h) { return recv(h, 0); } }; // detail_impl template using policy_t = ipc::policy::choose; } // internal-linkage namespace ipc { template ipc::handle_t chan_impl::inited() { ipc::detail::waiter::init(); return nullptr; } template bool chan_impl::connect(ipc::handle_t * ph, char const * name, unsigned mode) { return detail_impl>::connect(ph, name, mode & receiver); } template bool chan_impl::reconnect(ipc::handle_t * ph, unsigned mode) { return detail_impl>::reconnect(ph, mode & receiver); } template void chan_impl::disconnect(ipc::handle_t h) { detail_impl>::disconnect(h); } template void chan_impl::destroy(ipc::handle_t h) { detail_impl>::destroy(h); } template char const * chan_impl::name(ipc::handle_t h) { auto info = detail_impl>::info_of(h); return (info == nullptr) ? nullptr : info->name_.c_str(); } template std::size_t chan_impl::recv_count(ipc::handle_t h) { return detail_impl>::recv_count(h); } template bool chan_impl::wait_for_recv(ipc::handle_t h, std::size_t r_count, std::uint64_t tm) { return detail_impl>::wait_for_recv(h, r_count, tm); } template bool chan_impl::send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) { return detail_impl>::send(h, data, size, tm); } template buff_t chan_impl::recv(ipc::handle_t h, std::uint64_t tm) { return detail_impl>::recv(h, tm); } template bool chan_impl::try_send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) { return detail_impl>::try_send(h, data, size, tm); } template buff_t chan_impl::try_recv(ipc::handle_t h) { return detail_impl>::try_recv(h); } template struct chan_impl>; // template struct chan_impl>; // TBD // template struct chan_impl>; // TBD template struct chan_impl>; template struct chan_impl>; } // namespace ipc