#ifndef UTIL_PCQUEUE_H #define UTIL_PCQUEUE_H #include "exception.hh" #include #include #include #include #include #ifdef __APPLE__ #include #include #include #include #endif // __APPLE__ namespace util { /* OS X Maverick and Boost interprocess were doing "Function not implemented." * So this is my own wrapper around the mach kernel APIs. */ #ifdef __APPLE__ #define MACH_CALL(call) UTIL_THROW_IF(KERN_SUCCESS != (call), Exception, "Mach call failure") class Semaphore { public: explicit Semaphore(int value) : task_(mach_task_self()) { MACH_CALL(semaphore_create(task_, &back_, SYNC_POLICY_FIFO, value)); } ~Semaphore() { MACH_CALL(semaphore_destroy(task_, back_)); } void wait() { MACH_CALL(semaphore_wait(back_)); } void post() { MACH_CALL(semaphore_signal(back_)); } private: semaphore_t back_; task_t task_; }; inline void WaitSemaphore(Semaphore &semaphore) { semaphore.wait(); } #else typedef boost::interprocess::interprocess_semaphore Semaphore; inline void WaitSemaphore (Semaphore &on) { while (1) { try { on.wait(); break; } catch (boost::interprocess::interprocess_exception &e) { if (e.get_native_error() != EINTR) { throw; } } } } #endif // __APPLE__ /** * Producer consumer queue safe for multiple producers and multiple consumers. * T must be default constructable and have operator=. * The value is copied twice for Consume(T &out) or three times for Consume(), * so larger objects should be passed via pointer. * Strong exception guarantee if operator= throws. Undefined if semaphores throw. */ template class PCQueue : boost::noncopyable { public: explicit PCQueue(size_t size) : empty_(size), used_(0), storage_(new T[size]), end_(storage_.get() + size), produce_at_(storage_.get()), consume_at_(storage_.get()) {} // Add a value to the queue. void Produce(const T &val) { WaitSemaphore(empty_); { boost::unique_lock produce_lock(produce_at_mutex_); try { *produce_at_ = val; } catch (...) { empty_.post(); throw; } if (++produce_at_ == end_) produce_at_ = storage_.get(); } used_.post(); } // Consume a value, assigning it to out. T& Consume(T &out) { WaitSemaphore(used_); { boost::unique_lock consume_lock(consume_at_mutex_); try { out = *consume_at_; } catch (...) { used_.post(); throw; } if (++consume_at_ == end_) consume_at_ = storage_.get(); } empty_.post(); return out; } // Convenience version of Consume that copies the value to return. // The other version is faster. T Consume() { T ret; Consume(ret); return ret; } private: // Number of empty spaces in storage_. Semaphore empty_; // Number of occupied spaces in storage_. Semaphore used_; boost::scoped_array storage_; T *const end_; // Index for next write in storage_. T *produce_at_; boost::mutex produce_at_mutex_; // Index for next read from storage_. T *consume_at_; boost::mutex consume_at_mutex_; }; } // namespace util #endif // UTIL_PCQUEUE_H