#ifndef UTIL_THREAD_POOL_H #define UTIL_THREAD_POOL_H #include "util/pcqueue.hh" #include #include #include #include #include namespace util { template class Worker : boost::noncopyable { public: typedef HandlerT Handler; typedef typename Handler::Request Request; template Worker(PCQueue &in, Construct &construct, const Request &poison) : in_(in), handler_(construct), poison_(poison), thread_(boost::ref(*this)) {} // Only call from thread. void operator()() { Request request; while (1) { in_.Consume(request); if (request == poison_) return; try { (*handler_)(request); } catch(const std::exception &e) { std::cerr << "Handler threw " << e.what() << std::endl; abort(); } catch(...) { std::cerr << "Handler threw an exception, dropping request" << std::endl; abort(); } } } void Join() { thread_.join(); } private: PCQueue &in_; boost::optional handler_; const Request poison_; boost::thread thread_; }; template class ThreadPool : boost::noncopyable { public: typedef HandlerT Handler; typedef typename Handler::Request Request; template ThreadPool(size_t queue_length, size_t workers, Construct handler_construct, Request poison) : in_(queue_length), poison_(poison) { for (size_t i = 0; i < workers; ++i) { workers_.push_back(new Worker(in_, handler_construct, poison)); } } ~ThreadPool() { for (size_t i = 0; i < workers_.size(); ++i) { Produce(poison_); } for (typename boost::ptr_vector >::iterator i = workers_.begin(); i != workers_.end(); ++i) { i->Join(); } } void Produce(const Request &request) { in_.Produce(request); } // For adding to the queue. PCQueue &In() { return in_; } private: PCQueue in_; boost::ptr_vector > workers_; Request poison_; }; } // namespace util #endif // UTIL_THREAD_POOL_H