File size: 3,480 Bytes
1ce325b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
#ifndef UTIL_PCQUEUE_H
#define UTIL_PCQUEUE_H
#include "exception.hh"
#include <boost/interprocess/sync/interprocess_semaphore.hpp>
#include <boost/scoped_array.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/utility.hpp>
#include <cerrno>
#ifdef __APPLE__
#include <mach/semaphore.h>
#include <mach/task.h>
#include <mach/mach_traps.h>
#include <mach/mach.h>
#endif // __APPLE__
namespace util {
/* OS X Maverick and Boost interprocess were doing "Function not implemented."
* So this is my own wrapper around the mach kernel APIs.
*/
#ifdef __APPLE__
#define MACH_CALL(call) UTIL_THROW_IF(KERN_SUCCESS != (call), Exception, "Mach call failure")
class Semaphore {
public:
explicit Semaphore(int value) : task_(mach_task_self()) {
MACH_CALL(semaphore_create(task_, &back_, SYNC_POLICY_FIFO, value));
}
~Semaphore() {
MACH_CALL(semaphore_destroy(task_, back_));
}
void wait() {
MACH_CALL(semaphore_wait(back_));
}
void post() {
MACH_CALL(semaphore_signal(back_));
}
private:
semaphore_t back_;
task_t task_;
};
inline void WaitSemaphore(Semaphore &semaphore) {
semaphore.wait();
}
#else
typedef boost::interprocess::interprocess_semaphore Semaphore;
inline void WaitSemaphore (Semaphore &on) {
while (1) {
try {
on.wait();
break;
}
catch (boost::interprocess::interprocess_exception &e) {
if (e.get_native_error() != EINTR) {
throw;
}
}
}
}
#endif // __APPLE__
/**
* Producer consumer queue safe for multiple producers and multiple consumers.
* T must be default constructable and have operator=.
* The value is copied twice for Consume(T &out) or three times for Consume(),
* so larger objects should be passed via pointer.
* Strong exception guarantee if operator= throws. Undefined if semaphores throw.
*/
template <class T> class PCQueue : boost::noncopyable {
public:
explicit PCQueue(size_t size)
: empty_(size), used_(0),
storage_(new T[size]),
end_(storage_.get() + size),
produce_at_(storage_.get()),
consume_at_(storage_.get()) {}
// Add a value to the queue.
void Produce(const T &val) {
WaitSemaphore(empty_);
{
boost::unique_lock<boost::mutex> produce_lock(produce_at_mutex_);
try {
*produce_at_ = val;
}
catch (...) {
empty_.post();
throw;
}
if (++produce_at_ == end_) produce_at_ = storage_.get();
}
used_.post();
}
// Consume a value, assigning it to out.
T& Consume(T &out) {
WaitSemaphore(used_);
{
boost::unique_lock<boost::mutex> consume_lock(consume_at_mutex_);
try {
out = *consume_at_;
}
catch (...) {
used_.post();
throw;
}
if (++consume_at_ == end_) consume_at_ = storage_.get();
}
empty_.post();
return out;
}
// Convenience version of Consume that copies the value to return.
// The other version is faster.
T Consume() {
T ret;
Consume(ret);
return ret;
}
private:
// Number of empty spaces in storage_.
Semaphore empty_;
// Number of occupied spaces in storage_.
Semaphore used_;
boost::scoped_array<T> storage_;
T *const end_;
// Index for next write in storage_.
T *produce_at_;
boost::mutex produce_at_mutex_;
// Index for next read from storage_.
T *consume_at_;
boost::mutex consume_at_mutex_;
};
} // namespace util
#endif // UTIL_PCQUEUE_H
|