File size: 3,819 Bytes
9c774f1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#pragma once
#include <atomic>
#include <condition_variable>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <utility>
#include <vector>
#include <concepts>
#include <thread>
static inline size_t avail_threads(){
	return std::thread::hardware_concurrency();
}

template <typename F, typename... Args>
using irt = std::invoke_result_t<F, Args...>;

template <typename F, typename... Args>
static inline auto taskify(const F& funk, Args... args){
	return std::bind(funk, std::forward<Args...>(args)...);
}

static inline auto promitask(const std::invocable<> auto& funk){
	typedef irt<decltype(funk)> ret;
	auto pr = std::make_shared<std::promise<ret>>();
	if constexpr (std::same_as<irt<decltype(funk)>, void>)
		return make_pair([=](){ funk(), pr->set_value(); }, pr);
	else
		return make_pair([=](){ pr->set_value(funk()); }, pr);
}

static inline auto await_many(const std::ranges::range auto& fu){
	std::for_each(fu.begin(), fu.end(), [](auto&& f){ f.wait(); });
}

typedef std::function<void()> tasque;

template <typename Thrd = std::jthread>
class threadpool {
	/// If true the queue thread should exit
	std::atomic<bool> done;

	/// The thread object associated with this queue
	std::vector<Thrd> queue_threads;
	/// A queue of functions that will be executed on the queue thread
	std::queue<tasque> work_queue;

	/// The mutex used in the condition variable
	std::mutex queue_mutex;

	/// The condition variable that waits for a new function to be inserted in the
	/// queue
	std::condition_variable cond;

	/// This funciton executes on the queue_thread
	void queue_runner() {
		while (!done) {
			tasque func;
			{
				std::unique_lock<std::mutex> lock(queue_mutex);
				cond.wait( lock
						 , [this]() { return work_queue.empty() == false || done; });

				if (!done){
					swap(func, work_queue.front());
					work_queue.pop();
				}
			}
			if (func) func();
		}
	}

	void qup(const std::invocable<> auto& f){
		std::lock_guard<std::mutex> lock(queue_mutex);
		work_queue.push(f);
		cond.notify_one();
	}

public:
	template <typename F, typename... Args>
	void enqueue(const F& func, Args... args) requires std::invocable<F, Args...> {
		qup(taskify(func, args...));
	}

	template <typename F, typename... Args>
	auto inquire(const F& func, Args... args) requires std::invocable<F, Args...> {
		auto [t, pr] = promitask(taskify(func, args...));
		auto fut = pr->get_future();
		enqueue(t);
		return fut;
	}

	void clear() {
		{
			std::lock_guard<std::mutex> lock(queue_mutex);
			while(!work_queue.empty())
				work_queue.pop();
		}
		sync();
	}

	void sync(){
		std::atomic<size_t> n(0);
		const size_t m = queue_threads.size();
		auto present = [&](){ ++n; size_t l = n.load(); while(l < m) l = n.load(); };
		std::vector<std::future<void>> fu;
		std::ranges::generate_n(std::back_inserter(fu), m, [=, this](){ return inquire(present); });
		await_many(fu);
	}

	threadpool(size_t n, size_t res) : done(false)
									 , queue_threads(n ? std::clamp(n, size_t(1), avail_threads() - res)
													   : std::max(size_t(1), avail_threads() - res)) {
		for(auto& i:queue_threads){
			Thrd tmp(&threadpool::queue_runner, this);
			std::swap(i, tmp);
		}
	}
	threadpool(size_t n) : threadpool(n, 0) {}
	threadpool() : threadpool(0, 1) {}

	~threadpool() {
		sync();
		done.store(true);
		cond.notify_all();
	}

	threadpool(const threadpool& other) : work_queue(other.work_queue), done(false) {
		for(auto& i:queue_threads){
			Thrd tmp(&threadpool::queue_runner, this);
			std::swap(i, tmp);
		}
	}

	threadpool& operator=(const threadpool& other){
		clear();
		work_queue = other.work_queue;
		return *this;
	}
	size_t size() const { return queue_threads.size(); }
	threadpool& operator=(threadpool&& other) = default;
	threadpool(threadpool&& other) = default;
};