This commit is contained in:
Timofey K 2024-04-22 18:02:31 +03:00
commit 6d96c712eb
36 changed files with 2065 additions and 0 deletions

View file

@ -0,0 +1,40 @@
#include <exe/coroutine/impl.hpp>
#include <wheels/support/assert.hpp>
#include <wheels/support/compiler.hpp>
namespace exe::coroutine {
CoroutineImpl::CoroutineImpl(Routine routine, wheels::MutableMemView stack)
: routine_(std::move(routine)) {
coroutine_context_.Setup(stack, this);
}
void CoroutineImpl::Run() {
try {
routine_();
} catch (...) {
exception_ptr_ = std::current_exception();
}
is_complete_ = true;
coroutine_context_.SwitchTo(save_previous_context_);
abort();
}
void CoroutineImpl::Resume() {
save_previous_context_.SwitchTo(coroutine_context_);
if (exception_ptr_) {
rethrow_exception(exception_ptr_);
}
}
void CoroutineImpl::Suspend() {
coroutine_context_.SwitchTo(save_previous_context_);
}
bool CoroutineImpl::IsCompleted() const {
return is_complete_;
}
} // namespace exe::coroutine

View file

@ -0,0 +1,43 @@
#pragma once
#include <exe/coroutine/routine.hpp>
#include <context/context.hpp>
#include <wheels/memory/view.hpp>
#include <exception>
namespace exe::coroutine {
// Stackful asymmetric coroutine impl
// - Does not manage stacks
// - Unsafe Suspend
// Base for standalone coroutines, processors, fibers
class CoroutineImpl : public ::context::ITrampoline {
public:
CoroutineImpl(Routine routine, wheels::MutableMemView stack);
// Context: Caller
void Resume();
// Context: Coroutine
void Suspend();
// Context: Caller
bool IsCompleted() const;
private:
// context::ITrampoline
[[noreturn]] void Run() override;
private:
Routine routine_;
context::ExecutionContext coroutine_context_;
context::ExecutionContext save_previous_context_;
std::exception_ptr exception_ptr_;
bool is_complete_ = false;
};
} // namespace exe::coroutine

View file

@ -0,0 +1,9 @@
#pragma once
#include <wheels/support/function.hpp>
namespace exe::coroutine {
using Routine = wheels::UniqueFunction<void()>;
} // namespace exe::coroutine

View file

@ -0,0 +1,36 @@
#include <exe/coroutine/standalone.hpp>
#include <twist/util/thread_local.hpp>
#include <wheels/support/assert.hpp>
#include <wheels/support/defer.hpp>
namespace exe::coroutine {
static twist::util::ThreadLocalPtr<Coroutine> current;
Coroutine::Coroutine(Routine routine)
: stack_(AllocateStack()), impl_(std::move(routine), stack_.View()) {
}
void Coroutine::Resume() {
Coroutine* prev = current.Exchange(this);
wheels::Defer rollback([prev]() {
current = prev;
});
impl_.Resume();
}
void Coroutine::Suspend() {
WHEELS_VERIFY(current, "Not a coroutine");
current->impl_.Suspend();
}
context::Stack Coroutine::AllocateStack() {
static const size_t kStackPages = 16; // 16 * 4KB = 64KB
return context::Stack::AllocatePages(kStackPages);
}
} // namespace exe::coroutine

View file

@ -0,0 +1,30 @@
#include <exe/coroutine/impl.hpp>
#include <context/stack.hpp>
namespace exe::coroutine {
// Standalone coroutine
class Coroutine {
public:
explicit Coroutine(Routine routine);
void Resume();
// Suspend current coroutine
static void Suspend();
bool IsCompleted() {
return impl_.IsCompleted();
}
private:
static context::Stack AllocateStack();
private:
context::Stack stack_;
CoroutineImpl impl_;
};
} // namespace exe::coroutine

View file

@ -0,0 +1,43 @@
#pragma once
#include <exe/executors/executor.hpp>
#include <cstdlib>
namespace exe::executors {
namespace detail {
template <typename T>
struct TaskBaseFunction : TaskBase {
explicit TaskBaseFunction(T&& task) : task_(std::move(task)) {
}
void Run() noexcept override {
try {
task_();
} catch (...) {
}
Discard();
}
void Discard() noexcept override {
delete this;
}
private:
T task_;
};
} // namespace detail
/*
* Usage:
* Execute(thread_pool, []() {
* std::cout << "Hi" << std::endl;
* });
*/
template <typename F>
void Execute(IExecutor& where, F&& f, Hint hint = Hint::UpToYou) {
where.Execute(new detail::TaskBaseFunction(std::move(f)), hint);
}
} // namespace exe::executors

View file

@ -0,0 +1,19 @@
#pragma once
#include <exe/executors/task.hpp>
namespace exe::executors {
enum class Hint {
UpToYou = 1, // Rely on executor scheduling decision
Next = 2, // Use LIFO scheduling
Yield = 3
};
struct IExecutor {
virtual ~IExecutor() = default;
virtual void Execute(TaskBase* task, Hint hint) = 0;
};
} // namespace exe::executors

View file

@ -0,0 +1,19 @@
#pragma once
#include <wheels/intrusive/forward_list.hpp>
#include <functional>
#include <wheels/support/function.hpp>
namespace exe::executors {
struct ITask {
virtual ~ITask() = default;
virtual void Run() noexcept = 0;
virtual void Discard() noexcept = 0;
};
// Intrusive task
struct TaskBase : ITask, wheels::IntrusiveForwardListNode<TaskBase> {};
} // namespace exe::executors

View file

@ -0,0 +1,10 @@
#pragma once
#include <exe/executors/tp/fast/thread_pool.hpp>
namespace exe::executors {
// Default thread pool
using ThreadPool = tp::fast::ThreadPool;
} // namespace exe::executors

View file

@ -0,0 +1,78 @@
#pragma once
#include <twist/stdlike/mutex.hpp>
#include <twist/stdlike/atomic.hpp>
#include <twist/stdlike/condition_variable.hpp>
#include <exe/executors/task.hpp>
#include <optional>
#include <deque>
namespace exe::executors {
class UnboundedBlockingQueue {
using T = TaskBase*;
public:
bool Put(T value) {
std::lock_guard guard(mutex_);
if (is_closed_) {
return false;
}
not_empty_or_closed_.notify_one();
deque_.emplace_back(std::move(value));
return true;
}
std::optional<T> Take() {
std::unique_lock lock(mutex_);
while (deque_.empty()) {
if (is_closed_) {
return std::nullopt;
}
not_empty_or_closed_.wait(lock);
}
return GetValue();
}
void Close() {
CloseImpl(false);
}
void Cancel() {
CloseImpl(true);
}
private:
std::optional<T> GetValue() {
if (deque_.empty()) {
return std::nullopt;
}
T result = std::move(deque_.front());
deque_.pop_front();
return result;
}
void CloseImpl(bool clear) {
std::lock_guard guard(mutex_);
is_closed_ = true;
for (auto& i : deque_) {
i->Discard();
}
if (clear) [[likely]] { // NOLINT
deque_.clear();
}
not_empty_or_closed_.notify_all();
}
bool is_closed_ = false;
twist::stdlike::condition_variable not_empty_or_closed_;
twist::stdlike::mutex mutex_;
std::deque<T> deque_;
};
} // namespace exe::executors

View file

@ -0,0 +1,71 @@
#include <exe/executors/tp/compute/thread_pool.hpp>
#include <utility>
#include <cassert>
#include <wheels/logging/logging.hpp>
#include <twist/util/thread_local.hpp>
namespace exe::executors::tp::compute {
////////////////////////////////////////////////////////////////////////////////
static twist::util::ThreadLocalPtr<ThreadPool> pool;
////////////////////////////////////////////////////////////////////////////////
ThreadPool::ThreadPool(size_t workers) : wait_end_tasks_(0) {
for (size_t i = 0; i < workers; ++i) {
threads_.emplace_back(std::bind(&ThreadPool::Worker, this));
}
}
void ExecuteTask(TaskBase* task) {
try {
task->Run();
} catch (...) {
}
}
void ThreadPool::Worker() {
pool = this;
while (true) {
auto task = queue_.Take();
if (!task) {
break;
}
ExecuteTask(*task);
wait_end_tasks_.Done();
}
}
ThreadPool::~ThreadPool() {
assert(threads_.empty());
}
void ThreadPool::Execute(TaskBase* task, Hint) {
wait_end_tasks_.Add();
if (!queue_.Put(task)) {
task->Discard();
}
}
void ThreadPool::WaitIdle() {
wait_end_tasks_.Wait();
}
void ThreadPool::Stop() {
queue_.Cancel();
for (auto& i : threads_) {
i.join();
}
threads_.clear();
}
ThreadPool* ThreadPool::Current() {
return pool;
}
} // namespace exe::executors::tp::compute

View file

@ -0,0 +1,45 @@
#pragma once
#include <exe/executors/executor.hpp>
#include <twist/stdlike/thread.hpp>
#include <vector>
#include <exe/support/wait_group.hpp>
#include <exe/executors/tp/compute/blocking_queue.hpp>
namespace exe::executors::tp::compute {
// Fixed-size pool of worker threads
class ThreadPool : public IExecutor {
public:
explicit ThreadPool(size_t workers);
~ThreadPool();
// Non-copyable
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
// IExecutor
void Execute(TaskBase* task, Hint) override;
// Waits until outstanding work count has reached zero
void WaitIdle();
// Stops the worker threads as soon as possible
// Pending tasks will be discarded
void Stop();
// Locates current thread pool from worker thread
static ThreadPool* Current();
private:
void Worker();
private:
std::vector<twist::stdlike::thread> threads_;
UnboundedBlockingQueue queue_;
support::WaitGroup wait_end_tasks_;
};
} // namespace exe::executors::tp::compute

View file

@ -0,0 +1,30 @@
#include <exe/executors/tp/fast/coordinator.hpp>
#include <exe/executors/tp/fast/thread_pool.hpp>
namespace exe::executors::tp::fast {
Coordinator::Coordinator(ThreadPool& host, size_t workers)
: host_(host), workers_(workers), active_workers_(workers) {
for (size_t i = 0; i < workers; ++i) {
states_.emplace_back(WorkerState::kWorking);
}
}
void Coordinator::StopSpinning() {
if (spinning_workers_.fetch_sub(1) == 1) {
if (available_tasks_.load() > active_workers_.load()) {
NotifyOneWorker(host_.workers_);
}
}
}
void Coordinator::StartWorking(size_t index) {
if (states_[index].exchange(WorkerState::kWorking) == WorkerState::kWorking) {
NotifyOneWorker(host_.workers_);
}
active_workers_.fetch_add(1);
}
void Coordinator::WakeUp() {
active_workers_.fetch_add(1);
}
} // namespace exe::executors::tp::fast

View file

@ -0,0 +1,100 @@
#pragma once
#include <twist/stdlike/atomic.hpp>
#include <exe/support/wait_group.hpp>
#include <wheels/logging/logging.hpp>
#include <twist/stdlike/mutex.hpp>
#include <twist/stdlike/condition_variable.hpp>
#include <wheels/intrusive/list.hpp>
#include <exe/executors/tp/fast/worker.hpp>
#include <exe/support/spinlock.hpp>
#include <deque>
namespace exe::executors::tp::fast {
// Coordinates workers (stealing, parking)
enum class WorkerState : uint8_t { kWorking, kSleeping };
class Coordinator {
public:
Coordinator(ThreadPool& host, size_t workers);
void StartSpinning() {
spinning_workers_.fetch_add(1);
}
void StopSpinning();
bool StartSleeping(size_t index) {
if (is_stopped.load()) {
return false;
}
if (active_workers_.fetch_sub(1) <= available_tasks_.load()) {
active_workers_.fetch_add(1);
return false;
}
states_[index].store(WorkerState::kSleeping);
StopSpinning();
return true;
}
void StartWorking(size_t index);
void WakeUp();
void NotifyOneWorker(std::deque<Worker>& workers) {
if (active_workers_.load() == workers_ || spinning_workers_.load() != 0) {
return;
}
for (size_t i = 0; i < workers_; ++i) {
if (states_[i].exchange(WorkerState::kWorking) ==
WorkerState::kSleeping) {
workers[i].Notify();
return;
}
}
}
void NotifyAllWorkers(std::deque<Worker>& workers) {
for (size_t i = 0; i < workers_; ++i) {
workers[i].Notify();
}
}
void WaitIdle() {
wait_finished_tasks_.Wait();
}
void PushedTask() {
wait_finished_tasks_.Add();
available_tasks_.fetch_add(1);
}
void RunningTask() {
available_tasks_.fetch_sub(1);
}
void FinishTask() {
wait_finished_tasks_.Done();
}
twist::stdlike::atomic<bool> is_stopped{false};
private:
ThreadPool& host_;
size_t workers_ = 0;
twist::stdlike::atomic<size_t> active_workers_{0};
twist::stdlike::atomic<size_t> spinning_workers_{0};
twist::stdlike::atomic<size_t> available_tasks_{0};
std::deque<twist::stdlike::atomic<WorkerState>> states_;
support::WaitGroup wait_finished_tasks_;
};
} // namespace exe::executors::tp::fast

View file

@ -0,0 +1,23 @@
#pragma once
#include <cstdlib>
namespace exe::executors::tp::fast {
struct WorkerMetrics {
size_t count_solved_tasks = 0;
size_t count_pick_lifo_slot = 0;
size_t count_task_from_local_queue = 0;
size_t count_grab_from_global_queue = 0;
size_t count_park = 0;
size_t count_spinning = 0;
size_t count_offload = 0;
size_t count_sleep_rejects = 0;
size_t count_stiling = 0;
};
struct PoolMetrics : WorkerMetrics {
// Your metrics goes here
};
} // namespace exe::executors::tp::fast

View file

@ -0,0 +1,71 @@
#pragma once
#include <exe/executors/task.hpp>
#include <wheels/intrusive/forward_list.hpp>
#include <wheels/logging/logging.hpp>
#include <twist/stdlike/mutex.hpp>
#include <span>
#include <exe/support/spinlock.hpp>
namespace exe::executors::tp::fast {
// Unbounded queue shared between workers
class GlobalQueue {
public:
void PushOne(TaskBase* item) {
std::lock_guard lock(guard_queue_);
queue_.PushBack(item);
}
void PushList(wheels::IntrusiveForwardList<TaskBase>& list) {
std::lock_guard lock(guard_queue_);
queue_.Append(list);
}
void Offload(std::span<TaskBase*> buffer) {
wheels::IntrusiveForwardList<TaskBase> list;
for (auto& i : buffer) {
list.PushBack(i);
}
PushList(list);
}
// Returns nullptr if queue is empty
TaskBase* TryPopOne() {
std::lock_guard lock(guard_queue_);
return queue_.PopFront();
}
// Returns number of items in `out_buffer`
size_t Grab(std::span<TaskBase*> out_buffer, size_t workers) {
std::lock_guard lock(guard_queue_);
size_t grab_size = (queue_.Size() + workers - 1) / workers;
size_t current_grab = 0;
for (auto& i : out_buffer) {
i = queue_.PopFront();
if (i == nullptr) {
break;
}
current_grab++;
if (current_grab == grab_size) {
break;
}
}
return current_grab;
}
private:
support::SpinLock guard_queue_;
wheels::IntrusiveForwardList<TaskBase> queue_;
};
} // namespace exe::executors::tp::fast

View file

@ -0,0 +1,130 @@
#pragma once
#include <twist/stdlike/atomic.hpp>
#include <array>
#include <span>
#include <wheels/logging/logging.hpp>
#include <exe/executors/task.hpp>
namespace exe::executors::tp::fast {
// Single producer / multiple consumers bounded queue
// for local tasks
template <size_t Capacity>
class WorkStealingQueue {
// using T = TaskBase;
using T = TaskBase;
using Slot = T*;
public:
WorkStealingQueue() {
}
bool TryPush(T* item) {
size_t old_head = head_.load(std::memory_order::relaxed);
size_t old_tail = tail_.load(std::memory_order::relaxed);
if (IsFull(old_head, old_tail)) {
return false;
}
buffer_[old_tail % Capacity].store(item, std::memory_order::relaxed);
tail_.store(old_tail + 1, std::memory_order::release);
return true;
}
void PushManyTasks(std::span<T*> tasks) {
size_t old_head = head_.load(std::memory_order::acquire);
size_t old_tail = tail_.load(std::memory_order::relaxed);
if (Capacity - Size(old_head, old_tail) < tasks.size()) {
WHEELS_PANIC("In work stealing queue is no free place");
}
for (size_t i = 0; i < tasks.size(); ++i) {
buffer_[(old_tail + i) % Capacity].store(tasks[i],
std::memory_order::relaxed);
}
tail_.store(old_tail + tasks.size(), std::memory_order::release);
}
// Returns nullptr if queue is empty
T* TryPop() {
T* result = nullptr;
if (GrabImpl({&result, &result + 1}, 1, std::memory_order::relaxed) == 0) {
return nullptr;
}
return result;
}
// Returns number of tasks
size_t Grab(std::span<T*> out_buffer) {
return GrabImpl(out_buffer, 1, std::memory_order::acquire);
}
size_t GrabPart(std::span<T*> out_buffer, size_t part) {
return GrabImpl(out_buffer, part, std::memory_order::acquire);
}
void DiscardAll() {
std::array<T*, Capacity> tasks;
size_t count = Grab(tasks);
for (auto task : std::span<T*>(tasks.begin(), tasks.begin() + count)) {
task->Discard();
}
}
private:
size_t GrabImpl(std::span<T*> out_buffer, size_t part,
std::memory_order memory_order_read_tail) {
size_t size_out_buffer = out_buffer.size();
if (size_out_buffer == 0) {
return 0;
}
size_t old_head = head_.load(std::memory_order::acquire);
while (true) {
size_t old_tail = tail_.load(memory_order_read_tail);
size_t grab_size =
std::min(Size(old_head, old_tail) / part, size_out_buffer);
if (grab_size == 0) {
return grab_size;
}
for (size_t i = 0; i < grab_size; ++i) {
out_buffer[i] =
buffer_[(old_head + i) % Capacity].load(std::memory_order::relaxed);
}
if (head_.compare_exchange_strong(old_head, old_head + grab_size,
std::memory_order::release,
std::memory_order::acquire)) {
return grab_size;
}
}
}
static bool IsEmpty(size_t head, size_t tail) {
return Size(head, tail) == 0;
}
static bool IsFull(size_t head, size_t tail) {
return Size(head, tail) >= Capacity;
}
static size_t Size(size_t head, size_t tail) {
return head > tail ? 0 : tail - head;
}
private:
std::array<twist::stdlike::atomic<Slot>, Capacity> buffer_;
twist::stdlike::atomic<size_t> tail_{0};
twist::stdlike::atomic<size_t> head_{0};
};
} // namespace exe::executors::tp::fast

View file

@ -0,0 +1,76 @@
#include <exe/executors/tp/fast/thread_pool.hpp>
#include <twist/util/thread_local.hpp>
namespace exe::executors::tp::fast {
ThreadPool::ThreadPool(size_t threads)
: count_threads_(threads), coordinator_(*this, threads) {
for (size_t i = 0; i < threads; ++i) {
workers_.emplace_back(*this, i);
}
for (size_t i = 0; i < threads; ++i) {
workers_[i].Start();
}
}
ThreadPool::~ThreadPool() {
// if (!workers_.empty()) {
// std::abort();
// }
}
void ThreadPool::Execute(TaskBase* task, Hint hint) {
coordinator_.PushedTask();
Push(task, hint);
coordinator_.NotifyOneWorker(workers_);
}
void ThreadPool::Push(TaskBase* task, Hint hint) {
if (hint == Hint::Yield) {
global_tasks_.PushOne(task);
} else if (Worker::Current() != nullptr &&
std::addressof(Worker::Current()->Host()) == this) {
if (hint == Hint::Next) {
Worker::Current()->PushToLifoSlot(task);
} else {
Worker::Current()->PushToLocalQueue(task);
}
} else {
global_tasks_.PushOne(task);
}
}
void ThreadPool::WaitIdle() {
coordinator_.WaitIdle();
}
void ThreadPool::Stop() {
coordinator_.is_stopped.store(true); // TODO
coordinator_.NotifyAllWorkers(workers_);
for (auto& worker : workers_) {
worker.Join();
}
while (auto next = global_tasks_.TryPopOne()) {
next->Discard();
}
// workers_.clear();
}
std::vector<WorkerMetrics> ThreadPool::Metrics() const {
std::vector<WorkerMetrics> result;
for (auto& worker : workers_) {
result.emplace_back(worker.Metrics());
}
return result;
}
ThreadPool* ThreadPool::Current() {
auto current_worker = Worker::Current();
if (current_worker == nullptr) {
return nullptr;
}
return std::addressof(current_worker->Host());
}
} // namespace exe::executors::tp::fast

View file

@ -0,0 +1,53 @@
#pragma once
#include <exe/executors/executor.hpp>
#include <exe/executors/tp/fast/queues/global_queue.hpp>
#include <exe/executors/tp/fast/worker.hpp>
#include <exe/executors/tp/fast/coordinator.hpp>
#include <exe/executors/tp/fast/metrics.hpp>
// random_device
#include <twist/stdlike/random.hpp>
#include <deque>
namespace exe::executors::tp::fast {
// Scalable work-stealing scheduler for short-lived tasks
class ThreadPool : public IExecutor {
friend class Worker;
public:
explicit ThreadPool(size_t threads);
~ThreadPool();
// Non-copyable
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
// IExecutor
void Execute(TaskBase* task, Hint hint = Hint::UpToYou) override;
void WaitIdle();
void Stop();
// After Stop
std::vector<WorkerMetrics> Metrics() const;
static ThreadPool* Current();
private:
void Push(TaskBase* task, Hint hint);
private:
size_t count_threads_;
std::deque<Worker> workers_;
friend class Coordinator;
Coordinator coordinator_;
GlobalQueue global_tasks_;
};
} // namespace exe::executors::tp::fast

View file

@ -0,0 +1,229 @@
#include <exe/executors/tp/fast/worker.hpp>
#include <exe/executors/tp/fast/thread_pool.hpp>
#include <twist/strand/thread_local.hpp>
namespace exe::executors::tp::fast {
TWIST_DECLARE_TL_PTR(Worker, worker);
Worker::Worker(ThreadPool& host, size_t index) : host_(host), index_(index) {
}
void Worker::Start() {
thread_.emplace([this]() {
worker = this;
Work();
});
}
void Worker::Join() {
thread_->join();
}
void Worker::PushToLocalQueue(TaskBase* task) {
if (!local_tasks_.TryPush(task)) {
OffloadTasksToGlobalQueue(task);
}
}
void Worker::PushToLifoSlot(TaskBase* task) {
if (lifo_slot_ != nullptr) {
PushToLocalQueue(lifo_slot_);
}
lifo_slot_ = task;
}
size_t Worker::StealTasks(std::span<TaskBase*> out_buffer) {
return local_tasks_.GrabPart(out_buffer, 2);
}
TaskBase* Worker::PickNextTask() {
// [Periodically] Global queue
// 0) LIFO slot
// 1) Local queue
// 2) Global queue
// 3) Work stealing
// 4) Park worker
TaskBase* result = TryPickNextTask();
if (result != nullptr) {
return result;
}
host_.coordinator_.StartSpinning();
while (true) {
++metrics_.count_spinning;
uint32_t old = wakeups_.load();
result = TryStealTasks(8);
if (result != nullptr) {
++metrics_.count_stiling;
host_.coordinator_.StopSpinning();
if (host_.coordinator_.is_stopped.load()) {
result->Discard();
return nullptr;
}
return result;
}
if ((result = GrabTasksFromGlobalQueue()) != nullptr) {
host_.coordinator_.StopSpinning();
if (host_.coordinator_.is_stopped.load()) {
result->Discard();
return nullptr;
}
return result;
}
if (host_.coordinator_.StartSleeping(index_)) {
result = TryPickTaskBeforePark();
if (result != nullptr) {
host_.coordinator_.StartWorking(index_);
if (host_.coordinator_.is_stopped.load()) {
result->Discard();
return nullptr;
}
return result;
}
++metrics_.count_park;
Park(old);
host_.coordinator_.WakeUp();
host_.coordinator_.StartSpinning();
} else {
++metrics_.count_sleep_rejects;
}
if (host_.coordinator_.is_stopped.load()) {
host_.coordinator_.StopSpinning();
return nullptr;
}
}
}
void Worker::Work() {
while (TaskBase* next = PickNextTask()) {
host_.coordinator_.RunningTask();
next->Run();
++metrics_.count_solved_tasks;
host_.coordinator_.FinishTask();
}
local_tasks_.DiscardAll();
}
void Worker::Park(uint32_t old) {
while (wakeups_.load() == old) {
wakeups_.wait(old);
}
}
Worker* Worker::Current() {
return worker;
}
void Worker::OffloadTasksToGlobalQueue(TaskBase* task) {
++metrics_.count_offload;
std::array<TaskBase*, kLocalQueueCapacity / 2 + 1> buffer;
size_t count = local_tasks_.Grab({buffer.begin(), buffer.end() - 1});
buffer[count] = task;
host_.global_tasks_.Offload({buffer.begin(), buffer.begin() + count + 1});
}
TaskBase* Worker::GrabTasksFromGlobalQueue() {
std::array<TaskBase*, kLocalQueueCapacity / 2> buffer;
size_t count = host_.global_tasks_.Grab(buffer, host_.count_threads_);
if (count > 0) {
++metrics_.count_grab_from_global_queue;
local_tasks_.PushManyTasks({buffer.begin() + 1, buffer.begin() + count});
return buffer[0];
}
return nullptr;
}
TaskBase* Worker::TryStealTasksFrom(size_t index_worker) {
std::array<TaskBase*, kLocalQueueCapacity / 2> buffer;
size_t count =
host_.workers_[index_worker].StealTasks({buffer.begin(), buffer.end()});
if (count > 0) {
local_tasks_.PushManyTasks({buffer.begin() + 1, buffer.begin() + count});
return buffer[0];
}
return nullptr;
}
TaskBase* Worker::TryStealTasks(size_t series) {
for (size_t i = 0; i < series; ++i) {
TaskBase* result = TryStealTasksFrom(twister_() % host_.count_threads_);
if (result != nullptr) {
return result;
}
}
return nullptr;
}
TaskBase* Worker::TryPickTaskBeforePark() {
TaskBase* result = GrabTasksFromGlobalQueue();
if (result != nullptr) {
return result;
}
for (size_t i = 0; i < host_.count_threads_; ++i) {
result = TryStealTasksFrom(i);
if (result != nullptr) {
return result;
}
}
return nullptr;
}
TaskBase* Worker::TryPickNextTask() {
if (host_.coordinator_.is_stopped.load()) {
return nullptr;
}
TaskBase* result;
if (count_lifo_slot_ == 52) {
count_lifo_slot_ = 0;
if (lifo_slot_ != nullptr) {
PushToLocalQueue(std::exchange(lifo_slot_, nullptr));
}
} else {
++count_lifo_slot_;
result = std::exchange(lifo_slot_, nullptr);
if (result != nullptr) {
++metrics_.count_pick_lifo_slot;
return result;
}
}
result = local_tasks_.TryPop();
if (result != nullptr) {
++metrics_.count_task_from_local_queue;
return result;
}
if ((result = GrabTasksFromGlobalQueue()) != nullptr) {
return result;
}
return nullptr;
}
} // namespace exe::executors::tp::fast

View file

@ -0,0 +1,99 @@
#pragma once
#include <exe/executors/task.hpp>
#include <exe/executors/tp/fast/metrics.hpp>
#include <exe/executors/tp/fast/queues/work_stealing_queue.hpp>
#include <twist/stdlike/atomic.hpp>
#include <twist/stdlike/thread.hpp>
#include <twist/stdlike/random.hpp>
#include <cstdlib>
#include <optional>
#include <random>
#include <span>
namespace exe::executors::tp::fast {
class Worker;
class ThreadPool;
class Worker {
private:
static const size_t kLocalQueueCapacity = 256;
public:
Worker(ThreadPool& host, size_t index);
void Start();
void Join();
// Submit task
void PushToLocalQueue(TaskBase* task);
void PushToLifoSlot(TaskBase* task);
// Steal from this worker
size_t StealTasks(std::span<TaskBase*> out_buffer);
// Wake parked worker
void Wake();
static Worker* Current();
WorkerMetrics Metrics() const {
return metrics_;
}
ThreadPool& Host() const {
return host_;
}
bool Notify() {
wakeups_.fetch_add(1);
wakeups_.notify_one();
return true;
}
private:
void OffloadTasksToGlobalQueue(TaskBase* task);
TaskBase* TryStealTasksFrom(size_t index_worker);
TaskBase* TryStealTasks(size_t series);
TaskBase* GrabTasksFromGlobalQueue();
TaskBase* TryPickNextTask();
TaskBase* TryPickTaskBeforePark();
// Blocking
TaskBase* PickNextTask();
// Run Loop
void Work();
void Park(uint32_t);
private:
ThreadPool& host_;
const size_t index_;
// Worker thread
std::optional<twist::stdlike::thread> thread_;
// Local queue
WorkStealingQueue<kLocalQueueCapacity> local_tasks_;
// For work stealing
std::mt19937_64 twister_{twist::stdlike::random_device()()};
// LIFO slot
TaskBase* lifo_slot_ = nullptr;
// Parking lot
twist::stdlike::atomic<uint32_t> wakeups_{0};
size_t count_lifo_slot_ = 0;
WorkerMetrics metrics_;
};
} // namespace exe::executors::tp::fast

View file

@ -0,0 +1,31 @@
#pragma once
#include <exe/coroutine/routine.hpp>
#include <exe/executors/executor.hpp>
#include <exe/fibers/core/awaiter.hpp>
namespace exe::fibers {
using Routine = coroutine::Routine;
using Scheduler = executors::IExecutor;
// Considered harmful
// Starts a new fiber
void Go(Scheduler& scheduler, Routine routine);
// Starts a new fiber in the current scheduler
void Go(Routine routine);
namespace self {
void Yield(executors::Hint hint = executors::Hint::Yield);
// For synchronization primitives
// Do not use directly
void Suspend(exe::fibers::IContinuationStealingAwaiter*);
} // namespace self
} // namespace exe::fibers

View file

@ -0,0 +1,35 @@
#pragma once
#include <exe/fibers/core/handle.hpp>
namespace exe::fibers {
struct IContinuationStealingAwaiter {
virtual FiberHandle TransferTo(FiberHandle fiber_handle) = 0;
};
class ContinuationStealingAwaiter : public IContinuationStealingAwaiter {
public:
explicit ContinuationStealingAwaiter(FiberHandle fiber_handle)
: fiber_handle_(fiber_handle) {
}
FiberHandle TransferTo(FiberHandle fiber_handle) override final {
auto safe = fiber_handle_;
fiber_handle.Schedule();
return safe;
}
private:
FiberHandle fiber_handle_;
};
struct IAwaiter : IContinuationStealingAwaiter {
virtual FiberHandle TransferTo(FiberHandle fiber_handle) override final {
OnDispatch(fiber_handle);
return FiberHandle::Invalid();
}
virtual void OnDispatch(FiberHandle fiber_handle) = 0;
};
} // namespace exe::fibers

View file

@ -0,0 +1,118 @@
#include <exe/fibers/core/fiber.hpp>
#include <exe/fibers/core/stacks.hpp>
#include <exe/executors/execute.hpp>
#include <twist/util/thread_local.hpp>
#include <wheels/logging/logging.hpp>
namespace exe::fibers {
namespace detail {
void FiberTask::Run() noexcept {
// static_cast<exe::fibers::Fiber*>(this)->Resume();
auto fiber_handle = static_cast<exe::fibers::Fiber*>(this)->Resume();
if (fiber_handle.IsValid()) {
fiber_handle.Resume();
}
}
void FiberTask::Discard() noexcept {
delete static_cast<exe::fibers::Fiber*>(this);
}
} // namespace detail
struct YieldAwaiter : IAwaiter {
explicit YieldAwaiter(executors::Hint hint) : hint(hint) {
}
void OnDispatch(FiberHandle fiber_handle) override {
fiber_handle.Schedule(hint);
}
executors::Hint hint;
};
//////////////////////////////////////////////////////////////////////
static twist::util::ThreadLocalPtr<Fiber> fiber;
Fiber::Fiber(Scheduler& scheduler, Routine co)
: scheduler_(scheduler),
stack_(AllocateStack()),
coroutine_(std::move(co), stack_.View()) {
}
void Fiber::Schedule(executors::Hint hint) {
scheduler_.Execute(static_cast<FiberTask*>(this), hint);
}
void Fiber::Yield(executors::Hint hint) {
YieldAwaiter awaiter(hint);
awaiter_ = &awaiter;
coroutine_.Suspend();
}
void Fiber::Step() {
Fiber* tmp = fiber.Exchange(this);
coroutine_.Resume();
fiber.Exchange(tmp);
}
FiberHandle Fiber::Dispatch() {
if (!coroutine_.IsCompleted()) {
auto tmp = awaiter_;
awaiter_ = nullptr;
return tmp->TransferTo(FiberHandle(this));
//
// static_cast<IAwaiter*>(tmp)->OnDispatch(FiberHandle(this));
// return FiberHandle::Invalid();
} else {
delete this;
return FiberHandle::Invalid();
}
}
Fiber& Fiber::Self() {
return *fiber;
}
Fiber::~Fiber() {
ReleaseStack(std::move(stack_));
}
void Fiber::Suspend(IContinuationStealingAwaiter* awaiter) {
awaiter_ = awaiter;
coroutine_.Suspend();
}
FiberHandle Fiber::Resume() {
Step();
return Dispatch();
}
Scheduler& Fiber::GetScheduler() {
return scheduler_;
}
//////////////////////////////////////////////////////////////////////
// API Implementation
void Go(Scheduler& scheduler, Routine routine) {
Fiber* f = new Fiber(scheduler, std::move(routine));
f->Schedule();
}
void Go(Routine routine) {
Go(Fiber::Self().GetScheduler(), std::move(routine));
}
namespace self {
void Yield(executors::Hint hint) {
Fiber::Self().Yield(hint);
}
void Suspend(exe::fibers::IContinuationStealingAwaiter* awaiter) {
Fiber::Self().Suspend(awaiter);
}
} // namespace self
} // namespace exe::fibers

View file

@ -0,0 +1,52 @@
#pragma once
#include <exe/fibers/core/api.hpp>
#include <exe/coroutine/impl.hpp>
#include <context/stack.hpp>
#include <exe/fibers/core/awaiter.hpp>
#include <exe/executors/task.hpp> // experiment
enum FiberState { Running, Suspend, Waiting };
namespace exe::fibers {
namespace detail {
struct FiberTask : public executors::TaskBase {
virtual void Run() noexcept override;
virtual void Discard() noexcept override;
};
} // namespace detail
// Fiber = Stackful coroutine + Scheduler (Thread pool)
class Fiber : public detail::FiberTask {
friend detail::FiberTask;
public:
explicit Fiber(Scheduler& scheduler, Routine co);
void Schedule(executors::Hint hint = executors::Hint::UpToYou);
void Yield(executors::Hint hint);
FiberHandle Dispatch();
void Suspend(IContinuationStealingAwaiter*);
FiberHandle Resume();
Scheduler& GetScheduler();
static Fiber& Self();
~Fiber();
private:
// Task
void Step();
private:
Scheduler& scheduler_;
context::Stack stack_;
exe::coroutine::CoroutineImpl coroutine_;
IContinuationStealingAwaiter* awaiter_ = nullptr;
};
} // namespace exe::fibers

View file

@ -0,0 +1,27 @@
#include <exe/fibers/core/handle.hpp>
#include <exe/fibers/core/fiber.hpp>
#include <wheels/support/assert.hpp>
#include <utility>
namespace exe::fibers {
Fiber* FiberHandle::Release() {
WHEELS_ASSERT(fiber_ != nullptr, "Invalid fiber handle");
return std::exchange(fiber_, nullptr);
}
void FiberHandle::Schedule(executors::Hint hint) {
Release()->Schedule(hint);
}
void FiberHandle::Resume() {
auto fh = Release()->Resume();
while (fh.IsValid()) {
fh = fh.Release()->Resume();
}
}
} // namespace exe::fibers

View file

@ -0,0 +1,41 @@
#pragma once
#include <exe/executors/executor.hpp>
namespace exe::fibers {
class Fiber;
// Lightweight non-owning handle to a _suspended_ fiber object
class FiberHandle {
friend class Fiber;
public:
FiberHandle() : FiberHandle(nullptr) {
}
static FiberHandle Invalid() {
return FiberHandle(nullptr);
}
bool IsValid() const {
return fiber_ != nullptr;
}
// Schedule to an associated scheduler
void Schedule(executors::Hint hint = executors::Hint::UpToYou);
// Resume immediately in the current thread
void Resume();
private:
explicit FiberHandle(Fiber* fiber) : fiber_(fiber) {
}
Fiber* Release();
private:
Fiber* fiber_;
};
} // namespace exe::fibers

View file

@ -0,0 +1,54 @@
#include <exe/fibers/core/stacks.hpp>
#include <deque>
#include <exe/support/spinlock.hpp>
using context::Stack;
namespace exe::fibers {
//////////////////////////////////////////////////////////////////////
class StackAllocator {
public:
Stack Allocate() {
{
std::lock_guard guard(guard_stacks_);
if (!stacks_.empty()) {
auto stack = std::move(stacks_.back());
stacks_.pop_back();
return stack;
}
}
return AllocateNewStack();
}
void Release(Stack stack) {
std::lock_guard guard(guard_stacks_);
stacks_.push_back(std::move(stack));
}
private:
static Stack AllocateNewStack() {
static const size_t kStackPages = 16; // 16 * 4KB = 64KB
return Stack::AllocatePages(kStackPages);
}
private:
std::deque<Stack> stacks_;
support::SpinLock guard_stacks_;
};
//////////////////////////////////////////////////////////////////////
StackAllocator allocator;
context::Stack AllocateStack() {
return allocator.Allocate();
}
void ReleaseStack(context::Stack stack) {
allocator.Release(std::move(stack));
}
} // namespace exe::fibers

View file

@ -0,0 +1,10 @@
#pragma once
#include <context/stack.hpp>
namespace exe::fibers {
context::Stack AllocateStack();
void ReleaseStack(context::Stack stack);
} // namespace exe::fibers

View file

@ -0,0 +1,41 @@
#pragma once
#include <exe/fibers/sync/mutex.hpp>
#include <exe/fibers/sync/futex.hpp>
#include <twist/stdlike/atomic.hpp>
// std::unique_lock
#include <mutex>
namespace exe::fibers {
class CondVar {
using Lock = std::unique_lock<Mutex>;
public:
void Wait(Lock& lock) {
uint32_t old = count_notifies_.load();
lock.unlock();
notifier_.ParkIfEqual(old);
lock.lock();
}
void NotifyOne() {
count_notifies_.fetch_add(1);
notifier_.WakeOne();
}
void NotifyAll() {
count_notifies_.fetch_add(1);
notifier_.WakeAll();
}
private:
twist::stdlike::atomic<uint32_t> count_notifies_{0};
FutexLike<uint32_t> notifier_{count_notifies_};
};
} // namespace exe::fibers

View file

@ -0,0 +1,103 @@
#pragma once
#include <exe/fibers/core/api.hpp>
#include <twist/stdlike/atomic.hpp>
#include <wheels/intrusive/list.hpp>
#include <wheels/intrusive/forward_list.hpp>
#include <wheels/support/assert.hpp>
#include <exe/support/spinlock.hpp>
#include <exe/fibers/core/fiber.hpp>
#include <wheels/logging/logging.hpp>
namespace exe::fibers {
template <class T>
concept Unlockable = requires(T x) {
x.Unlock();
};
template <Unlockable T>
class FutexAwaiter : public IAwaiter,
public wheels::IntrusiveForwardListNode<FutexAwaiter<T>> {
public:
explicit FutexAwaiter(T& value) : value_(value) {
}
void OnDispatch(FiberHandle fiber_handle) override {
fiber_handle_ = fiber_handle;
value_.Unlock();
}
void Schedule() {
fiber_handle_.Schedule();
}
private:
T& value_;
FiberHandle fiber_handle_;
};
template <typename T>
class FutexLike {
public:
explicit FutexLike(twist::stdlike::atomic<T>& cell) : cell_(cell) {
}
~FutexLike() {
if (!sleeping_queue_.IsEmpty()) {
assert(false);
}
}
// Park current fiber if cell.load() == `old`
void ParkIfEqual(T old) {
guard_sleep_queue_.lock();
if (cell_.load() != old) {
guard_sleep_queue_.unlock();
return;
}
auto awaiter = FutexAwaiter(guard_sleep_queue_);
sleeping_queue_.PushBack(&awaiter);
self::Suspend(&awaiter);
}
void WakeOne() {
std::lock_guard g(guard_sleep_queue_);
if (sleeping_queue_.IsEmpty()) {
return;
}
sleeping_queue_.PopFront()->Schedule();
}
void WakeAll() {
std::unique_lock lock(guard_sleep_queue_);
if (sleeping_queue_.IsEmpty()) {
return;
}
auto wakeup_queue = std::move(sleeping_queue_);
lock.unlock();
while (!wakeup_queue.IsEmpty()) {
wakeup_queue.PopFront()->Schedule();
}
}
private:
exe::support::SpinLock guard_sleep_queue_;
twist::stdlike::atomic<T>& cell_;
wheels::IntrusiveForwardList<FutexAwaiter<support::SpinLock>> sleeping_queue_;
};
} // namespace exe::fibers

View file

@ -0,0 +1,57 @@
#pragma once
#include <exe/fibers/sync/futex.hpp>
#include <twist/stdlike/atomic.hpp>
#include <exe/support/lockfree_mutex_queue.hpp>
namespace exe::fibers {
class Mutex {
struct Node : public detail::mutex::IntrusiveNode {
FiberHandle fiber_handle;
};
struct MutexAwaiter : public IAwaiter {
explicit MutexAwaiter(Mutex& mutex) : mutex(mutex) {
}
void OnDispatch(FiberHandle fiber_handle) override {
node.fiber_handle = fiber_handle;
if (mutex.awaiters_.TryLockOrEnqueue(&node)) {
node.fiber_handle.Schedule(executors::Hint::Next);
}
}
Mutex& mutex;
Node node;
};
public:
void Lock() {
if (awaiters_.TryLock()) {
return;
}
MutexAwaiter mutex_awaiter(*this);
fibers::self::Suspend(&mutex_awaiter);
}
void Unlock() {
auto node = awaiters_.TryPopOrUnlock();
if (node != nullptr) {
auto awaiter = fibers::ContinuationStealingAwaiter(node->fiber_handle);
fibers::self::Suspend(&awaiter);
}
}
void lock() { // NOLINT
Lock();
}
void unlock() { // NOLINT
Unlock();
}
detail::mutex::LockfreeIntrusiveQueue<Node> awaiters_;
};
} // namespace exe::fibers

View file

@ -0,0 +1,57 @@
#pragma once
#include <exe/fibers/sync/futex.hpp>
#include <twist/stdlike/atomic.hpp>
namespace exe::fibers {
// https://gobyexample.com/waitgroups
namespace detail {
class WaitGroupParking {
public:
void Notify() {
done_.store(1);
notifier_.WakeAll();
done_.store(2);
}
void Wait() {
while (done_.load() == 0) {
notifier_.ParkIfEqual(0);
}
while (done_.load() == 1) {
self::Yield();
}
}
private:
// 0 - wait done, 1 - wait end notify, 2 - released
twist::stdlike::atomic<uint32_t> done_{0};
FutexLike<uint32_t> notifier_{done_};
};
} // namespace detail
class WaitGroup {
public:
void Add(size_t count) {
counter_.fetch_add(count);
}
void Done() {
if (counter_.fetch_sub(1) == 1) {
releaser_.Notify();
}
}
void Wait() {
releaser_.Wait();
}
private:
twist::stdlike::atomic<uint32_t> counter_{0};
detail::WaitGroupParking releaser_;
};
} // namespace exe::fibers

View file

@ -0,0 +1,108 @@
#pragma once
#include <cstdint>
#include <vector>
#include <twist/stdlike/atomic.hpp>
namespace exe::fibers::detail::mutex {
struct IntrusiveNode {
uintptr_t next = 0;
};
template <typename T>
class LockfreeIntrusiveQueue {
using Node = IntrusiveNode;
enum State : uintptr_t { kUnlock = 1, kLock = 0 };
public:
// if tail == kUnlock then try cas to kLock
// else try push
// return true if successful lock
bool TryLockOrEnqueue(Node* new_node) {
new_node->next = tail_.load();
while (true) {
if (new_node->next == State::kUnlock) {
if (tail_.compare_exchange_weak(new_node->next, State::kLock)) {
return true;
}
} else {
if (tail_.compare_exchange_weak(new_node->next, ToUIntPtr(new_node))) {
return false;
}
}
}
}
T* TryPopOrUnlock() {
if (output_ == nullptr) {
auto output = GrabOrUnlock();
if (output == nullptr) {
return nullptr;
}
output_ = output;
}
return PopFromOutput();
}
T* PopFromOutput() {
return static_cast<T*>(std::exchange(output_, ToNode(output_->next)));
}
bool TryLock() {
auto old = tail_.load();
if (old == State::kUnlock) {
return tail_.compare_exchange_weak(old, State::kLock);
}
return false;
}
Node* GrabOrUnlock() {
uintptr_t old = tail_.load();
while (true) {
if (old == State::kLock) {
if (tail_.compare_exchange_weak(old, State::kUnlock)) {
return nullptr;
}
} else {
return ReverseStack(tail_.exchange(State::kLock));
}
}
}
private:
static uintptr_t ToUIntPtr(Node* node) {
return reinterpret_cast<uintptr_t>(node);
}
static Node* ToNode(uintptr_t ptr) {
return reinterpret_cast<Node*>(ptr);
}
static Node* ReverseStack(uintptr_t ptr) {
if (ptr == State::kUnlock || ptr == State::kLock) {
WHEELS_PANIC("reverse stack");
}
uintptr_t prev = ptr;
ptr = ToNode(ptr)->next;
ToNode(prev)->next = State::kLock;
while (ptr != State::kLock) {
uintptr_t next = ToNode(ptr)->next;
ToNode(ptr)->next = prev;
prev = ptr;
ptr = next;
}
return ToNode(prev);
}
twist::stdlike::atomic<uintptr_t> tail_{State::kUnlock};
Node* output_ = nullptr;
};
} // namespace exe::fibers::detail::mutex

View file

@ -0,0 +1,41 @@
#pragma once
#include <twist/stdlike/atomic.hpp>
#include <twist/util/spin_wait.hpp>
#include <wheels/logging/logging.hpp>
#include <twist/util/spin_wait.hpp>
namespace exe::support {
// Test-and-TAS spinlock
class SpinLock {
public:
void Lock() {
while (is_locked_.exchange(true, std::memory_order::acquire)) {
twist::util::SpinWait s;
while (is_locked_.load(std::memory_order::relaxed)) {
s.Spin();
}
}
}
void Unlock() {
is_locked_.store(false, std::memory_order::release);
}
// BasicLockable
void lock() { // NOLINT
Lock();
}
void unlock() { // NOLINT
Unlock();
}
private:
twist::stdlike::atomic<bool> is_locked_{false};
};
} // namespace exe::support

View file

@ -0,0 +1,36 @@
#pragma once
#include <mutex>
#include <twist/stdlike/atomic.hpp>
#include <wheels/logging/logging.hpp>
namespace exe::support {
class WaitGroup {
public:
explicit WaitGroup(uint32_t count = 0) : counter_(count) {
}
void Add() {
zero_notifier_.store(0);
counter_.fetch_add(1);
}
void Done() {
if (counter_.fetch_sub(1) == 1) {
zero_notifier_.fetch_add(1);
zero_notifier_.notify_all();
}
}
void Wait() {
uint32_t old_zero_notifier = 0;
while (counter_.load() != 0) {
zero_notifier_.wait(old_zero_notifier);
}
}
private:
twist::stdlike::atomic<uint32_t> counter_{0};
twist::stdlike::atomic<uint32_t> zero_notifier_{0};
};
} // namespace exe::support