File size: 1,546 Bytes
1ce325b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
#include "parallel_read.hh"
#include "file.hh"
#ifdef WITH_THREADS
#include "thread_pool.hh"
namespace util {
namespace {
class Reader {
public:
explicit Reader(int fd) : fd_(fd) {}
struct Request {
void *to;
std::size_t size;
uint64_t offset;
bool operator==(const Request &other) const {
return (to == other.to) && (size == other.size) && (offset == other.offset);
}
};
void operator()(const Request &request) {
util::ErsatzPRead(fd_, request.to, request.size, request.offset);
}
private:
int fd_;
};
} // namespace
void ParallelRead(int fd, void *to, std::size_t amount, uint64_t offset) {
Reader::Request poison;
poison.to = NULL;
poison.size = 0;
poison.offset = 0;
unsigned threads = boost::thread::hardware_concurrency();
if (!threads) threads = 2;
ThreadPool<Reader> pool(2 /* don't need much of a queue */, threads, fd, poison);
const std::size_t kBatch = 1ULL << 25; // 32 MB
Reader::Request request;
request.to = to;
request.size = kBatch;
request.offset = offset;
for (; amount > kBatch; amount -= kBatch) {
pool.Produce(request);
request.to = reinterpret_cast<uint8_t*>(request.to) + kBatch;
request.offset += kBatch;
}
request.size = amount;
if (request.size) {
pool.Produce(request);
}
}
} // namespace util
#else // WITH_THREADS
namespace util {
void ParallelRead(int fd, void *to, std::size_t amount, uint64_t offset) {
util::ErsatzPRead(fd, to, amount, offset);
}
} // namespace util
#endif
|