xls-r-uzbek-cv8 / kenlm /util /parallel_read.cc
lucio's picture
Training in progress, step 5000
8652957
raw
history blame
1.56 kB
#include "util/parallel_read.hh"
#include "util/file.hh"
#ifdef WITH_THREADS
#include "util/thread_pool.hh"
namespace util {
namespace {
class Reader {
public:
explicit Reader(int fd) : fd_(fd) {}
struct Request {
void *to;
std::size_t size;
uint64_t offset;
bool operator==(const Request &other) const {
return (to == other.to) && (size == other.size) && (offset == other.offset);
}
};
void operator()(const Request &request) {
util::ErsatzPRead(fd_, request.to, request.size, request.offset);
}
private:
int fd_;
};
} // namespace
void ParallelRead(int fd, void *to, std::size_t amount, uint64_t offset) {
Reader::Request poison;
poison.to = NULL;
poison.size = 0;
poison.offset = 0;
unsigned threads = boost::thread::hardware_concurrency();
if (!threads) threads = 2;
ThreadPool<Reader> pool(2 /* don't need much of a queue */, threads, fd, poison);
const std::size_t kBatch = 1ULL << 25; // 32 MB
Reader::Request request;
request.to = to;
request.size = kBatch;
request.offset = offset;
for (; amount > kBatch; amount -= kBatch) {
pool.Produce(request);
request.to = reinterpret_cast<uint8_t*>(request.to) + kBatch;
request.offset += kBatch;
}
request.size = amount;
if (request.size) {
pool.Produce(request);
}
}
} // namespace util
#else // WITH_THREADS
namespace util {
void ParallelRead(int fd, void *to, std::size_t amount, uint64_t offset) {
util::ErsatzPRead(fd, to, amount, offset);
}
} // namespace util
#endif