From 6d96c712eba673d236fcfc37286dec1c26a4d222 Mon Sep 17 00:00:00 2001 From: Timofey K Date: Mon, 22 Apr 2024 18:02:31 +0300 Subject: [PATCH] init --- fibers/scheduler/exe/coroutine/impl.cpp | 40 +++ fibers/scheduler/exe/coroutine/impl.hpp | 43 ++++ fibers/scheduler/exe/coroutine/routine.hpp | 9 + fibers/scheduler/exe/coroutine/standalone.cpp | 36 +++ fibers/scheduler/exe/coroutine/standalone.hpp | 30 +++ fibers/scheduler/exe/executors/execute.hpp | 43 ++++ fibers/scheduler/exe/executors/executor.hpp | 19 ++ fibers/scheduler/exe/executors/task.hpp | 19 ++ .../scheduler/exe/executors/thread_pool.hpp | 10 + .../executors/tp/compute/blocking_queue.hpp | 78 ++++++ .../exe/executors/tp/compute/thread_pool.cpp | 71 ++++++ .../exe/executors/tp/compute/thread_pool.hpp | 45 ++++ .../exe/executors/tp/fast/coordinator.cpp | 30 +++ .../exe/executors/tp/fast/coordinator.hpp | 100 ++++++++ .../exe/executors/tp/fast/metrics.hpp | 23 ++ .../executors/tp/fast/queues/global_queue.hpp | 71 ++++++ .../tp/fast/queues/work_stealing_queue.hpp | 130 ++++++++++ .../exe/executors/tp/fast/thread_pool.cpp | 76 ++++++ .../exe/executors/tp/fast/thread_pool.hpp | 53 ++++ .../exe/executors/tp/fast/worker.cpp | 229 ++++++++++++++++++ .../exe/executors/tp/fast/worker.hpp | 99 ++++++++ fibers/scheduler/exe/fibers/core/api.hpp | 31 +++ fibers/scheduler/exe/fibers/core/awaiter.hpp | 35 +++ fibers/scheduler/exe/fibers/core/fiber.cpp | 118 +++++++++ fibers/scheduler/exe/fibers/core/fiber.hpp | 52 ++++ fibers/scheduler/exe/fibers/core/handle.cpp | 27 +++ fibers/scheduler/exe/fibers/core/handle.hpp | 41 ++++ fibers/scheduler/exe/fibers/core/stacks.cpp | 54 +++++ fibers/scheduler/exe/fibers/core/stacks.hpp | 10 + fibers/scheduler/exe/fibers/sync/condvar.hpp | 41 ++++ fibers/scheduler/exe/fibers/sync/futex.hpp | 103 ++++++++ fibers/scheduler/exe/fibers/sync/mutex.hpp | 57 +++++ .../scheduler/exe/fibers/sync/wait_group.hpp | 57 +++++ .../exe/support/lockfree_mutex_queue.hpp | 108 +++++++++ fibers/scheduler/exe/support/spinlock.hpp | 41 ++++ fibers/scheduler/exe/support/wait_group.hpp | 36 +++ 36 files changed, 2065 insertions(+) create mode 100644 fibers/scheduler/exe/coroutine/impl.cpp create mode 100644 fibers/scheduler/exe/coroutine/impl.hpp create mode 100644 fibers/scheduler/exe/coroutine/routine.hpp create mode 100644 fibers/scheduler/exe/coroutine/standalone.cpp create mode 100644 fibers/scheduler/exe/coroutine/standalone.hpp create mode 100644 fibers/scheduler/exe/executors/execute.hpp create mode 100644 fibers/scheduler/exe/executors/executor.hpp create mode 100644 fibers/scheduler/exe/executors/task.hpp create mode 100644 fibers/scheduler/exe/executors/thread_pool.hpp create mode 100644 fibers/scheduler/exe/executors/tp/compute/blocking_queue.hpp create mode 100644 fibers/scheduler/exe/executors/tp/compute/thread_pool.cpp create mode 100644 fibers/scheduler/exe/executors/tp/compute/thread_pool.hpp create mode 100644 fibers/scheduler/exe/executors/tp/fast/coordinator.cpp create mode 100644 fibers/scheduler/exe/executors/tp/fast/coordinator.hpp create mode 100644 fibers/scheduler/exe/executors/tp/fast/metrics.hpp create mode 100644 fibers/scheduler/exe/executors/tp/fast/queues/global_queue.hpp create mode 100644 fibers/scheduler/exe/executors/tp/fast/queues/work_stealing_queue.hpp create mode 100644 fibers/scheduler/exe/executors/tp/fast/thread_pool.cpp create mode 100644 fibers/scheduler/exe/executors/tp/fast/thread_pool.hpp create mode 100644 fibers/scheduler/exe/executors/tp/fast/worker.cpp create mode 100644 fibers/scheduler/exe/executors/tp/fast/worker.hpp create mode 100644 fibers/scheduler/exe/fibers/core/api.hpp create mode 100644 fibers/scheduler/exe/fibers/core/awaiter.hpp create mode 100644 fibers/scheduler/exe/fibers/core/fiber.cpp create mode 100644 fibers/scheduler/exe/fibers/core/fiber.hpp create mode 100644 fibers/scheduler/exe/fibers/core/handle.cpp create mode 100644 fibers/scheduler/exe/fibers/core/handle.hpp create mode 100644 fibers/scheduler/exe/fibers/core/stacks.cpp create mode 100644 fibers/scheduler/exe/fibers/core/stacks.hpp create mode 100644 fibers/scheduler/exe/fibers/sync/condvar.hpp create mode 100644 fibers/scheduler/exe/fibers/sync/futex.hpp create mode 100644 fibers/scheduler/exe/fibers/sync/mutex.hpp create mode 100644 fibers/scheduler/exe/fibers/sync/wait_group.hpp create mode 100644 fibers/scheduler/exe/support/lockfree_mutex_queue.hpp create mode 100644 fibers/scheduler/exe/support/spinlock.hpp create mode 100644 fibers/scheduler/exe/support/wait_group.hpp diff --git a/fibers/scheduler/exe/coroutine/impl.cpp b/fibers/scheduler/exe/coroutine/impl.cpp new file mode 100644 index 0000000..3e08977 --- /dev/null +++ b/fibers/scheduler/exe/coroutine/impl.cpp @@ -0,0 +1,40 @@ +#include + +#include +#include + +namespace exe::coroutine { + +CoroutineImpl::CoroutineImpl(Routine routine, wheels::MutableMemView stack) + : routine_(std::move(routine)) { + coroutine_context_.Setup(stack, this); +} + +void CoroutineImpl::Run() { + try { + routine_(); + } catch (...) { + exception_ptr_ = std::current_exception(); + } + + is_complete_ = true; + coroutine_context_.SwitchTo(save_previous_context_); + abort(); +} + +void CoroutineImpl::Resume() { + save_previous_context_.SwitchTo(coroutine_context_); + if (exception_ptr_) { + rethrow_exception(exception_ptr_); + } +} + +void CoroutineImpl::Suspend() { + coroutine_context_.SwitchTo(save_previous_context_); +} + +bool CoroutineImpl::IsCompleted() const { + return is_complete_; +} + +} // namespace exe::coroutine diff --git a/fibers/scheduler/exe/coroutine/impl.hpp b/fibers/scheduler/exe/coroutine/impl.hpp new file mode 100644 index 0000000..7df0b40 --- /dev/null +++ b/fibers/scheduler/exe/coroutine/impl.hpp @@ -0,0 +1,43 @@ +#pragma once + +#include + +#include + +#include + +#include + +namespace exe::coroutine { + +// Stackful asymmetric coroutine impl +// - Does not manage stacks +// - Unsafe Suspend +// Base for standalone coroutines, processors, fibers + +class CoroutineImpl : public ::context::ITrampoline { + public: + CoroutineImpl(Routine routine, wheels::MutableMemView stack); + + // Context: Caller + void Resume(); + + // Context: Coroutine + void Suspend(); + + // Context: Caller + bool IsCompleted() const; + + private: + // context::ITrampoline + [[noreturn]] void Run() override; + + private: + Routine routine_; + context::ExecutionContext coroutine_context_; + context::ExecutionContext save_previous_context_; + std::exception_ptr exception_ptr_; + bool is_complete_ = false; +}; + +} // namespace exe::coroutine diff --git a/fibers/scheduler/exe/coroutine/routine.hpp b/fibers/scheduler/exe/coroutine/routine.hpp new file mode 100644 index 0000000..f451787 --- /dev/null +++ b/fibers/scheduler/exe/coroutine/routine.hpp @@ -0,0 +1,9 @@ +#pragma once + +#include + +namespace exe::coroutine { + +using Routine = wheels::UniqueFunction; + +} // namespace exe::coroutine diff --git a/fibers/scheduler/exe/coroutine/standalone.cpp b/fibers/scheduler/exe/coroutine/standalone.cpp new file mode 100644 index 0000000..5c0a85d --- /dev/null +++ b/fibers/scheduler/exe/coroutine/standalone.cpp @@ -0,0 +1,36 @@ +#include + +#include + +#include +#include + +namespace exe::coroutine { + +static twist::util::ThreadLocalPtr current; + +Coroutine::Coroutine(Routine routine) + : stack_(AllocateStack()), impl_(std::move(routine), stack_.View()) { +} + +void Coroutine::Resume() { + Coroutine* prev = current.Exchange(this); + + wheels::Defer rollback([prev]() { + current = prev; + }); + + impl_.Resume(); +} + +void Coroutine::Suspend() { + WHEELS_VERIFY(current, "Not a coroutine"); + current->impl_.Suspend(); +} + +context::Stack Coroutine::AllocateStack() { + static const size_t kStackPages = 16; // 16 * 4KB = 64KB + return context::Stack::AllocatePages(kStackPages); +} + +} // namespace exe::coroutine diff --git a/fibers/scheduler/exe/coroutine/standalone.hpp b/fibers/scheduler/exe/coroutine/standalone.hpp new file mode 100644 index 0000000..420d02c --- /dev/null +++ b/fibers/scheduler/exe/coroutine/standalone.hpp @@ -0,0 +1,30 @@ +#include + +#include + +namespace exe::coroutine { + +// Standalone coroutine + +class Coroutine { + public: + explicit Coroutine(Routine routine); + + void Resume(); + + // Suspend current coroutine + static void Suspend(); + + bool IsCompleted() { + return impl_.IsCompleted(); + } + + private: + static context::Stack AllocateStack(); + + private: + context::Stack stack_; + CoroutineImpl impl_; +}; + +} // namespace exe::coroutine diff --git a/fibers/scheduler/exe/executors/execute.hpp b/fibers/scheduler/exe/executors/execute.hpp new file mode 100644 index 0000000..6054cb0 --- /dev/null +++ b/fibers/scheduler/exe/executors/execute.hpp @@ -0,0 +1,43 @@ +#pragma once + +#include + +#include + +namespace exe::executors { + +namespace detail { +template +struct TaskBaseFunction : TaskBase { + explicit TaskBaseFunction(T&& task) : task_(std::move(task)) { + } + + void Run() noexcept override { + try { + task_(); + } catch (...) { + } + Discard(); + } + + void Discard() noexcept override { + delete this; + } + + private: + T task_; +}; +} // namespace detail + +/* + * Usage: + * Execute(thread_pool, []() { + * std::cout << "Hi" << std::endl; + * }); + */ + +template +void Execute(IExecutor& where, F&& f, Hint hint = Hint::UpToYou) { + where.Execute(new detail::TaskBaseFunction(std::move(f)), hint); +} +} // namespace exe::executors \ No newline at end of file diff --git a/fibers/scheduler/exe/executors/executor.hpp b/fibers/scheduler/exe/executors/executor.hpp new file mode 100644 index 0000000..865e6f5 --- /dev/null +++ b/fibers/scheduler/exe/executors/executor.hpp @@ -0,0 +1,19 @@ +#pragma once + +#include + +namespace exe::executors { + +enum class Hint { + UpToYou = 1, // Rely on executor scheduling decision + Next = 2, // Use LIFO scheduling + Yield = 3 +}; + +struct IExecutor { + virtual ~IExecutor() = default; + + virtual void Execute(TaskBase* task, Hint hint) = 0; +}; + +} // namespace exe::executors diff --git a/fibers/scheduler/exe/executors/task.hpp b/fibers/scheduler/exe/executors/task.hpp new file mode 100644 index 0000000..54f69d5 --- /dev/null +++ b/fibers/scheduler/exe/executors/task.hpp @@ -0,0 +1,19 @@ +#pragma once + +#include +#include +#include + +namespace exe::executors { + +struct ITask { + virtual ~ITask() = default; + + virtual void Run() noexcept = 0; + virtual void Discard() noexcept = 0; +}; + +// Intrusive task +struct TaskBase : ITask, wheels::IntrusiveForwardListNode {}; + +} // namespace exe::executors diff --git a/fibers/scheduler/exe/executors/thread_pool.hpp b/fibers/scheduler/exe/executors/thread_pool.hpp new file mode 100644 index 0000000..f625d8a --- /dev/null +++ b/fibers/scheduler/exe/executors/thread_pool.hpp @@ -0,0 +1,10 @@ +#pragma once + +#include + +namespace exe::executors { + +// Default thread pool +using ThreadPool = tp::fast::ThreadPool; + +} // namespace exe::executors diff --git a/fibers/scheduler/exe/executors/tp/compute/blocking_queue.hpp b/fibers/scheduler/exe/executors/tp/compute/blocking_queue.hpp new file mode 100644 index 0000000..5da3f78 --- /dev/null +++ b/fibers/scheduler/exe/executors/tp/compute/blocking_queue.hpp @@ -0,0 +1,78 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include + +namespace exe::executors { +class UnboundedBlockingQueue { + using T = TaskBase*; + + public: + bool Put(T value) { + std::lock_guard guard(mutex_); + + if (is_closed_) { + return false; + } + + not_empty_or_closed_.notify_one(); + deque_.emplace_back(std::move(value)); + + return true; + } + + std::optional Take() { + std::unique_lock lock(mutex_); + + while (deque_.empty()) { + if (is_closed_) { + return std::nullopt; + } + not_empty_or_closed_.wait(lock); + } + + return GetValue(); + } + + void Close() { + CloseImpl(false); + } + + void Cancel() { + CloseImpl(true); + } + + private: + std::optional GetValue() { + if (deque_.empty()) { + return std::nullopt; + } + T result = std::move(deque_.front()); + deque_.pop_front(); + return result; + } + + void CloseImpl(bool clear) { + std::lock_guard guard(mutex_); + is_closed_ = true; + for (auto& i : deque_) { + i->Discard(); + } + if (clear) [[likely]] { // NOLINT + deque_.clear(); + } + not_empty_or_closed_.notify_all(); + } + + bool is_closed_ = false; + twist::stdlike::condition_variable not_empty_or_closed_; + twist::stdlike::mutex mutex_; + std::deque deque_; +}; + +} // namespace exe::executors diff --git a/fibers/scheduler/exe/executors/tp/compute/thread_pool.cpp b/fibers/scheduler/exe/executors/tp/compute/thread_pool.cpp new file mode 100644 index 0000000..36f357e --- /dev/null +++ b/fibers/scheduler/exe/executors/tp/compute/thread_pool.cpp @@ -0,0 +1,71 @@ +#include +#include +#include +#include + +#include + +namespace exe::executors::tp::compute { + +//////////////////////////////////////////////////////////////////////////////// + +static twist::util::ThreadLocalPtr pool; + +//////////////////////////////////////////////////////////////////////////////// + +ThreadPool::ThreadPool(size_t workers) : wait_end_tasks_(0) { + for (size_t i = 0; i < workers; ++i) { + threads_.emplace_back(std::bind(&ThreadPool::Worker, this)); + } +} + +void ExecuteTask(TaskBase* task) { + try { + task->Run(); + } catch (...) { + } +} + +void ThreadPool::Worker() { + pool = this; + + while (true) { + auto task = queue_.Take(); + if (!task) { + break; + } + + ExecuteTask(*task); + wait_end_tasks_.Done(); + } +} + +ThreadPool::~ThreadPool() { + assert(threads_.empty()); +} + +void ThreadPool::Execute(TaskBase* task, Hint) { + wait_end_tasks_.Add(); + if (!queue_.Put(task)) { + task->Discard(); + } +} + +void ThreadPool::WaitIdle() { + wait_end_tasks_.Wait(); +} + +void ThreadPool::Stop() { + queue_.Cancel(); + for (auto& i : threads_) { + i.join(); + } + + threads_.clear(); +} + +ThreadPool* ThreadPool::Current() { + return pool; +} + +} // namespace exe::executors::tp::compute diff --git a/fibers/scheduler/exe/executors/tp/compute/thread_pool.hpp b/fibers/scheduler/exe/executors/tp/compute/thread_pool.hpp new file mode 100644 index 0000000..46ad264 --- /dev/null +++ b/fibers/scheduler/exe/executors/tp/compute/thread_pool.hpp @@ -0,0 +1,45 @@ +#pragma once + +#include +#include +#include + +#include +#include + +namespace exe::executors::tp::compute { + +// Fixed-size pool of worker threads + +class ThreadPool : public IExecutor { + public: + explicit ThreadPool(size_t workers); + ~ThreadPool(); + + // Non-copyable + ThreadPool(const ThreadPool&) = delete; + ThreadPool& operator=(const ThreadPool&) = delete; + + // IExecutor + void Execute(TaskBase* task, Hint) override; + + // Waits until outstanding work count has reached zero + void WaitIdle(); + + // Stops the worker threads as soon as possible + // Pending tasks will be discarded + void Stop(); + + // Locates current thread pool from worker thread + static ThreadPool* Current(); + + private: + void Worker(); + + private: + std::vector threads_; + UnboundedBlockingQueue queue_; + support::WaitGroup wait_end_tasks_; +}; + +} // namespace exe::executors::tp::compute diff --git a/fibers/scheduler/exe/executors/tp/fast/coordinator.cpp b/fibers/scheduler/exe/executors/tp/fast/coordinator.cpp new file mode 100644 index 0000000..77415bd --- /dev/null +++ b/fibers/scheduler/exe/executors/tp/fast/coordinator.cpp @@ -0,0 +1,30 @@ +#include +#include + +namespace exe::executors::tp::fast { +Coordinator::Coordinator(ThreadPool& host, size_t workers) + : host_(host), workers_(workers), active_workers_(workers) { + for (size_t i = 0; i < workers; ++i) { + states_.emplace_back(WorkerState::kWorking); + } +} + +void Coordinator::StopSpinning() { + if (spinning_workers_.fetch_sub(1) == 1) { + if (available_tasks_.load() > active_workers_.load()) { + NotifyOneWorker(host_.workers_); + } + } +} + +void Coordinator::StartWorking(size_t index) { + if (states_[index].exchange(WorkerState::kWorking) == WorkerState::kWorking) { + NotifyOneWorker(host_.workers_); + } + active_workers_.fetch_add(1); +} + +void Coordinator::WakeUp() { + active_workers_.fetch_add(1); +} +} // namespace exe::executors::tp::fast diff --git a/fibers/scheduler/exe/executors/tp/fast/coordinator.hpp b/fibers/scheduler/exe/executors/tp/fast/coordinator.hpp new file mode 100644 index 0000000..e7a6419 --- /dev/null +++ b/fibers/scheduler/exe/executors/tp/fast/coordinator.hpp @@ -0,0 +1,100 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace exe::executors::tp::fast { + +// Coordinates workers (stealing, parking) + +enum class WorkerState : uint8_t { kWorking, kSleeping }; + +class Coordinator { + public: + Coordinator(ThreadPool& host, size_t workers); + + void StartSpinning() { + spinning_workers_.fetch_add(1); + } + + void StopSpinning(); + + bool StartSleeping(size_t index) { + if (is_stopped.load()) { + return false; + } + + if (active_workers_.fetch_sub(1) <= available_tasks_.load()) { + active_workers_.fetch_add(1); + return false; + } + + states_[index].store(WorkerState::kSleeping); + + StopSpinning(); + + return true; + } + + void StartWorking(size_t index); + + void WakeUp(); + + void NotifyOneWorker(std::deque& workers) { + if (active_workers_.load() == workers_ || spinning_workers_.load() != 0) { + return; + } + for (size_t i = 0; i < workers_; ++i) { + if (states_[i].exchange(WorkerState::kWorking) == + WorkerState::kSleeping) { + workers[i].Notify(); + return; + } + } + } + + void NotifyAllWorkers(std::deque& workers) { + for (size_t i = 0; i < workers_; ++i) { + workers[i].Notify(); + } + } + + void WaitIdle() { + wait_finished_tasks_.Wait(); + } + + void PushedTask() { + wait_finished_tasks_.Add(); + available_tasks_.fetch_add(1); + } + + void RunningTask() { + available_tasks_.fetch_sub(1); + } + + void FinishTask() { + wait_finished_tasks_.Done(); + } + + twist::stdlike::atomic is_stopped{false}; + + private: + ThreadPool& host_; + size_t workers_ = 0; + twist::stdlike::atomic active_workers_{0}; + twist::stdlike::atomic spinning_workers_{0}; + twist::stdlike::atomic available_tasks_{0}; + std::deque> states_; + + support::WaitGroup wait_finished_tasks_; +}; + +} // namespace exe::executors::tp::fast diff --git a/fibers/scheduler/exe/executors/tp/fast/metrics.hpp b/fibers/scheduler/exe/executors/tp/fast/metrics.hpp new file mode 100644 index 0000000..ed6ec94 --- /dev/null +++ b/fibers/scheduler/exe/executors/tp/fast/metrics.hpp @@ -0,0 +1,23 @@ +#pragma once + +#include + +namespace exe::executors::tp::fast { + +struct WorkerMetrics { + size_t count_solved_tasks = 0; + size_t count_pick_lifo_slot = 0; + size_t count_task_from_local_queue = 0; + size_t count_grab_from_global_queue = 0; + size_t count_park = 0; + size_t count_spinning = 0; + size_t count_offload = 0; + size_t count_sleep_rejects = 0; + size_t count_stiling = 0; +}; + +struct PoolMetrics : WorkerMetrics { + // Your metrics goes here +}; + +} // namespace exe::executors::tp::fast diff --git a/fibers/scheduler/exe/executors/tp/fast/queues/global_queue.hpp b/fibers/scheduler/exe/executors/tp/fast/queues/global_queue.hpp new file mode 100644 index 0000000..783feab --- /dev/null +++ b/fibers/scheduler/exe/executors/tp/fast/queues/global_queue.hpp @@ -0,0 +1,71 @@ +#pragma once + +#include + +#include +#include + +#include + +#include +#include + +namespace exe::executors::tp::fast { + +// Unbounded queue shared between workers + +class GlobalQueue { + public: + void PushOne(TaskBase* item) { + std::lock_guard lock(guard_queue_); + + queue_.PushBack(item); + } + + void PushList(wheels::IntrusiveForwardList& list) { + std::lock_guard lock(guard_queue_); + queue_.Append(list); + } + + void Offload(std::span buffer) { + wheels::IntrusiveForwardList list; + for (auto& i : buffer) { + list.PushBack(i); + } + PushList(list); + } + + // Returns nullptr if queue is empty + TaskBase* TryPopOne() { + std::lock_guard lock(guard_queue_); + + return queue_.PopFront(); + } + + // Returns number of items in `out_buffer` + size_t Grab(std::span out_buffer, size_t workers) { + std::lock_guard lock(guard_queue_); + + size_t grab_size = (queue_.Size() + workers - 1) / workers; + + size_t current_grab = 0; + for (auto& i : out_buffer) { + i = queue_.PopFront(); + if (i == nullptr) { + break; + } + current_grab++; + if (current_grab == grab_size) { + break; + } + } + + return current_grab; + } + + private: + support::SpinLock guard_queue_; + wheels::IntrusiveForwardList queue_; +}; + +} // namespace exe::executors::tp::fast diff --git a/fibers/scheduler/exe/executors/tp/fast/queues/work_stealing_queue.hpp b/fibers/scheduler/exe/executors/tp/fast/queues/work_stealing_queue.hpp new file mode 100644 index 0000000..f77a4dd --- /dev/null +++ b/fibers/scheduler/exe/executors/tp/fast/queues/work_stealing_queue.hpp @@ -0,0 +1,130 @@ +#pragma once + +#include + +#include +#include +#include + +#include + +namespace exe::executors::tp::fast { + +// Single producer / multiple consumers bounded queue +// for local tasks + +template +class WorkStealingQueue { + // using T = TaskBase; + using T = TaskBase; + using Slot = T*; + + public: + WorkStealingQueue() { + } + + bool TryPush(T* item) { + size_t old_head = head_.load(std::memory_order::relaxed); + size_t old_tail = tail_.load(std::memory_order::relaxed); + + if (IsFull(old_head, old_tail)) { + return false; + } + + buffer_[old_tail % Capacity].store(item, std::memory_order::relaxed); + tail_.store(old_tail + 1, std::memory_order::release); + + return true; + } + + void PushManyTasks(std::span tasks) { + size_t old_head = head_.load(std::memory_order::acquire); + size_t old_tail = tail_.load(std::memory_order::relaxed); + + if (Capacity - Size(old_head, old_tail) < tasks.size()) { + WHEELS_PANIC("In work stealing queue is no free place"); + } + + for (size_t i = 0; i < tasks.size(); ++i) { + buffer_[(old_tail + i) % Capacity].store(tasks[i], + std::memory_order::relaxed); + } + + tail_.store(old_tail + tasks.size(), std::memory_order::release); + } + + // Returns nullptr if queue is empty + T* TryPop() { + T* result = nullptr; + if (GrabImpl({&result, &result + 1}, 1, std::memory_order::relaxed) == 0) { + return nullptr; + } + return result; + } + + // Returns number of tasks + size_t Grab(std::span out_buffer) { + return GrabImpl(out_buffer, 1, std::memory_order::acquire); + } + + size_t GrabPart(std::span out_buffer, size_t part) { + return GrabImpl(out_buffer, part, std::memory_order::acquire); + } + + void DiscardAll() { + std::array tasks; + size_t count = Grab(tasks); + for (auto task : std::span(tasks.begin(), tasks.begin() + count)) { + task->Discard(); + } + } + + private: + size_t GrabImpl(std::span out_buffer, size_t part, + std::memory_order memory_order_read_tail) { + size_t size_out_buffer = out_buffer.size(); + if (size_out_buffer == 0) { + return 0; + } + + size_t old_head = head_.load(std::memory_order::acquire); + + while (true) { + size_t old_tail = tail_.load(memory_order_read_tail); + size_t grab_size = + std::min(Size(old_head, old_tail) / part, size_out_buffer); + + if (grab_size == 0) { + return grab_size; + } + + for (size_t i = 0; i < grab_size; ++i) { + out_buffer[i] = + buffer_[(old_head + i) % Capacity].load(std::memory_order::relaxed); + } + + if (head_.compare_exchange_strong(old_head, old_head + grab_size, + std::memory_order::release, + std::memory_order::acquire)) { + return grab_size; + } + } + } + + static bool IsEmpty(size_t head, size_t tail) { + return Size(head, tail) == 0; + } + static bool IsFull(size_t head, size_t tail) { + return Size(head, tail) >= Capacity; + } + static size_t Size(size_t head, size_t tail) { + return head > tail ? 0 : tail - head; + } + + private: + std::array, Capacity> buffer_; + twist::stdlike::atomic tail_{0}; + twist::stdlike::atomic head_{0}; +}; + +} // namespace exe::executors::tp::fast diff --git a/fibers/scheduler/exe/executors/tp/fast/thread_pool.cpp b/fibers/scheduler/exe/executors/tp/fast/thread_pool.cpp new file mode 100644 index 0000000..3ea8f79 --- /dev/null +++ b/fibers/scheduler/exe/executors/tp/fast/thread_pool.cpp @@ -0,0 +1,76 @@ +#include + +#include + +namespace exe::executors::tp::fast { + +ThreadPool::ThreadPool(size_t threads) + : count_threads_(threads), coordinator_(*this, threads) { + for (size_t i = 0; i < threads; ++i) { + workers_.emplace_back(*this, i); + } + for (size_t i = 0; i < threads; ++i) { + workers_[i].Start(); + } +} + +ThreadPool::~ThreadPool() { + // if (!workers_.empty()) { + // std::abort(); + // } +} + +void ThreadPool::Execute(TaskBase* task, Hint hint) { + coordinator_.PushedTask(); + Push(task, hint); + coordinator_.NotifyOneWorker(workers_); +} + +void ThreadPool::Push(TaskBase* task, Hint hint) { + if (hint == Hint::Yield) { + global_tasks_.PushOne(task); + } else if (Worker::Current() != nullptr && + std::addressof(Worker::Current()->Host()) == this) { + if (hint == Hint::Next) { + Worker::Current()->PushToLifoSlot(task); + } else { + Worker::Current()->PushToLocalQueue(task); + } + } else { + global_tasks_.PushOne(task); + } +} + +void ThreadPool::WaitIdle() { + coordinator_.WaitIdle(); +} + +void ThreadPool::Stop() { + coordinator_.is_stopped.store(true); // TODO + coordinator_.NotifyAllWorkers(workers_); + for (auto& worker : workers_) { + worker.Join(); + } + while (auto next = global_tasks_.TryPopOne()) { + next->Discard(); + } + // workers_.clear(); +} + +std::vector ThreadPool::Metrics() const { + std::vector result; + for (auto& worker : workers_) { + result.emplace_back(worker.Metrics()); + } + return result; +} + +ThreadPool* ThreadPool::Current() { + auto current_worker = Worker::Current(); + if (current_worker == nullptr) { + return nullptr; + } + return std::addressof(current_worker->Host()); +} + +} // namespace exe::executors::tp::fast diff --git a/fibers/scheduler/exe/executors/tp/fast/thread_pool.hpp b/fibers/scheduler/exe/executors/tp/fast/thread_pool.hpp new file mode 100644 index 0000000..b089d43 --- /dev/null +++ b/fibers/scheduler/exe/executors/tp/fast/thread_pool.hpp @@ -0,0 +1,53 @@ +#pragma once + +#include + +#include +#include +#include +#include + +// random_device +#include + +#include + +namespace exe::executors::tp::fast { + +// Scalable work-stealing scheduler for short-lived tasks + +class ThreadPool : public IExecutor { + friend class Worker; + + public: + explicit ThreadPool(size_t threads); + ~ThreadPool(); + + // Non-copyable + ThreadPool(const ThreadPool&) = delete; + ThreadPool& operator=(const ThreadPool&) = delete; + + // IExecutor + void Execute(TaskBase* task, Hint hint = Hint::UpToYou) override; + + void WaitIdle(); + + void Stop(); + + // After Stop + std::vector Metrics() const; + + static ThreadPool* Current(); + + private: + void Push(TaskBase* task, Hint hint); + + private: + size_t count_threads_; + std::deque workers_; + friend class Coordinator; + Coordinator coordinator_; + GlobalQueue global_tasks_; +}; + +} // namespace exe::executors::tp::fast diff --git a/fibers/scheduler/exe/executors/tp/fast/worker.cpp b/fibers/scheduler/exe/executors/tp/fast/worker.cpp new file mode 100644 index 0000000..b8cc05f --- /dev/null +++ b/fibers/scheduler/exe/executors/tp/fast/worker.cpp @@ -0,0 +1,229 @@ +#include +#include + +#include + +namespace exe::executors::tp::fast { + +TWIST_DECLARE_TL_PTR(Worker, worker); + +Worker::Worker(ThreadPool& host, size_t index) : host_(host), index_(index) { +} + +void Worker::Start() { + thread_.emplace([this]() { + worker = this; + Work(); + }); +} + +void Worker::Join() { + thread_->join(); +} + +void Worker::PushToLocalQueue(TaskBase* task) { + if (!local_tasks_.TryPush(task)) { + OffloadTasksToGlobalQueue(task); + } +} + +void Worker::PushToLifoSlot(TaskBase* task) { + if (lifo_slot_ != nullptr) { + PushToLocalQueue(lifo_slot_); + } + lifo_slot_ = task; +} + +size_t Worker::StealTasks(std::span out_buffer) { + return local_tasks_.GrabPart(out_buffer, 2); +} + +TaskBase* Worker::PickNextTask() { + // [Periodically] Global queue + // 0) LIFO slot + // 1) Local queue + // 2) Global queue + // 3) Work stealing + // 4) Park worker + + TaskBase* result = TryPickNextTask(); + if (result != nullptr) { + return result; + } + + host_.coordinator_.StartSpinning(); + while (true) { + ++metrics_.count_spinning; + uint32_t old = wakeups_.load(); + + result = TryStealTasks(8); + if (result != nullptr) { + ++metrics_.count_stiling; + host_.coordinator_.StopSpinning(); + + if (host_.coordinator_.is_stopped.load()) { + result->Discard(); + return nullptr; + } + + return result; + } + + if ((result = GrabTasksFromGlobalQueue()) != nullptr) { + host_.coordinator_.StopSpinning(); + + if (host_.coordinator_.is_stopped.load()) { + result->Discard(); + return nullptr; + } + + return result; + } + + if (host_.coordinator_.StartSleeping(index_)) { + result = TryPickTaskBeforePark(); + + if (result != nullptr) { + host_.coordinator_.StartWorking(index_); + + if (host_.coordinator_.is_stopped.load()) { + result->Discard(); + return nullptr; + } + + return result; + } + + ++metrics_.count_park; + Park(old); + host_.coordinator_.WakeUp(); + host_.coordinator_.StartSpinning(); + } else { + ++metrics_.count_sleep_rejects; + } + + if (host_.coordinator_.is_stopped.load()) { + host_.coordinator_.StopSpinning(); + return nullptr; + } + } +} + +void Worker::Work() { + while (TaskBase* next = PickNextTask()) { + host_.coordinator_.RunningTask(); + next->Run(); + ++metrics_.count_solved_tasks; + host_.coordinator_.FinishTask(); + } + local_tasks_.DiscardAll(); +} + +void Worker::Park(uint32_t old) { + while (wakeups_.load() == old) { + wakeups_.wait(old); + } +} + +Worker* Worker::Current() { + return worker; +} + +void Worker::OffloadTasksToGlobalQueue(TaskBase* task) { + ++metrics_.count_offload; + std::array buffer; + size_t count = local_tasks_.Grab({buffer.begin(), buffer.end() - 1}); + buffer[count] = task; + host_.global_tasks_.Offload({buffer.begin(), buffer.begin() + count + 1}); +} + +TaskBase* Worker::GrabTasksFromGlobalQueue() { + std::array buffer; + + size_t count = host_.global_tasks_.Grab(buffer, host_.count_threads_); + + if (count > 0) { + ++metrics_.count_grab_from_global_queue; + local_tasks_.PushManyTasks({buffer.begin() + 1, buffer.begin() + count}); + return buffer[0]; + } + + return nullptr; +} + +TaskBase* Worker::TryStealTasksFrom(size_t index_worker) { + std::array buffer; + + size_t count = + host_.workers_[index_worker].StealTasks({buffer.begin(), buffer.end()}); + + if (count > 0) { + local_tasks_.PushManyTasks({buffer.begin() + 1, buffer.begin() + count}); + + return buffer[0]; + } + + return nullptr; +} + +TaskBase* Worker::TryStealTasks(size_t series) { + for (size_t i = 0; i < series; ++i) { + TaskBase* result = TryStealTasksFrom(twister_() % host_.count_threads_); + if (result != nullptr) { + return result; + } + } + return nullptr; +} + +TaskBase* Worker::TryPickTaskBeforePark() { + TaskBase* result = GrabTasksFromGlobalQueue(); + if (result != nullptr) { + return result; + } + + for (size_t i = 0; i < host_.count_threads_; ++i) { + result = TryStealTasksFrom(i); + if (result != nullptr) { + return result; + } + } + + return nullptr; +} + +TaskBase* Worker::TryPickNextTask() { + if (host_.coordinator_.is_stopped.load()) { + return nullptr; + } + + TaskBase* result; + + if (count_lifo_slot_ == 52) { + count_lifo_slot_ = 0; + if (lifo_slot_ != nullptr) { + PushToLocalQueue(std::exchange(lifo_slot_, nullptr)); + } + } else { + ++count_lifo_slot_; + result = std::exchange(lifo_slot_, nullptr); + if (result != nullptr) { + ++metrics_.count_pick_lifo_slot; + return result; + } + } + + result = local_tasks_.TryPop(); + if (result != nullptr) { + ++metrics_.count_task_from_local_queue; + return result; + } + + if ((result = GrabTasksFromGlobalQueue()) != nullptr) { + return result; + } + + return nullptr; +} + +} // namespace exe::executors::tp::fast diff --git a/fibers/scheduler/exe/executors/tp/fast/worker.hpp b/fibers/scheduler/exe/executors/tp/fast/worker.hpp new file mode 100644 index 0000000..6de9e3d --- /dev/null +++ b/fibers/scheduler/exe/executors/tp/fast/worker.hpp @@ -0,0 +1,99 @@ +#pragma once + +#include + +#include +#include + +#include +#include +#include + +#include +#include +#include +#include + +namespace exe::executors::tp::fast { + +class Worker; +class ThreadPool; + +class Worker { + private: + static const size_t kLocalQueueCapacity = 256; + + public: + Worker(ThreadPool& host, size_t index); + + void Start(); + void Join(); + + // Submit task + void PushToLocalQueue(TaskBase* task); + void PushToLifoSlot(TaskBase* task); + + // Steal from this worker + size_t StealTasks(std::span out_buffer); + + // Wake parked worker + void Wake(); + + static Worker* Current(); + + WorkerMetrics Metrics() const { + return metrics_; + } + + ThreadPool& Host() const { + return host_; + } + + bool Notify() { + wakeups_.fetch_add(1); + wakeups_.notify_one(); + return true; + } + + private: + void OffloadTasksToGlobalQueue(TaskBase* task); + + TaskBase* TryStealTasksFrom(size_t index_worker); + TaskBase* TryStealTasks(size_t series); + TaskBase* GrabTasksFromGlobalQueue(); + TaskBase* TryPickNextTask(); + TaskBase* TryPickTaskBeforePark(); + + // Blocking + TaskBase* PickNextTask(); + + // Run Loop + void Work(); + + void Park(uint32_t); + + private: + ThreadPool& host_; + const size_t index_; + + // Worker thread + std::optional thread_; + + // Local queue + WorkStealingQueue local_tasks_; + + // For work stealing + std::mt19937_64 twister_{twist::stdlike::random_device()()}; + + // LIFO slot + TaskBase* lifo_slot_ = nullptr; + + // Parking lot + twist::stdlike::atomic wakeups_{0}; + + size_t count_lifo_slot_ = 0; + + WorkerMetrics metrics_; +}; + +} // namespace exe::executors::tp::fast diff --git a/fibers/scheduler/exe/fibers/core/api.hpp b/fibers/scheduler/exe/fibers/core/api.hpp new file mode 100644 index 0000000..5906613 --- /dev/null +++ b/fibers/scheduler/exe/fibers/core/api.hpp @@ -0,0 +1,31 @@ +#pragma once + +#include +#include +#include + +namespace exe::fibers { + +using Routine = coroutine::Routine; + +using Scheduler = executors::IExecutor; + +// Considered harmful + +// Starts a new fiber +void Go(Scheduler& scheduler, Routine routine); + +// Starts a new fiber in the current scheduler +void Go(Routine routine); + +namespace self { + +void Yield(executors::Hint hint = executors::Hint::Yield); + +// For synchronization primitives +// Do not use directly +void Suspend(exe::fibers::IContinuationStealingAwaiter*); + +} // namespace self + +} // namespace exe::fibers diff --git a/fibers/scheduler/exe/fibers/core/awaiter.hpp b/fibers/scheduler/exe/fibers/core/awaiter.hpp new file mode 100644 index 0000000..bba3aed --- /dev/null +++ b/fibers/scheduler/exe/fibers/core/awaiter.hpp @@ -0,0 +1,35 @@ +#pragma once + +#include + +namespace exe::fibers { + +struct IContinuationStealingAwaiter { + virtual FiberHandle TransferTo(FiberHandle fiber_handle) = 0; +}; + +class ContinuationStealingAwaiter : public IContinuationStealingAwaiter { + public: + explicit ContinuationStealingAwaiter(FiberHandle fiber_handle) + : fiber_handle_(fiber_handle) { + } + + FiberHandle TransferTo(FiberHandle fiber_handle) override final { + auto safe = fiber_handle_; + fiber_handle.Schedule(); + return safe; + } + + private: + FiberHandle fiber_handle_; +}; + +struct IAwaiter : IContinuationStealingAwaiter { + virtual FiberHandle TransferTo(FiberHandle fiber_handle) override final { + OnDispatch(fiber_handle); + return FiberHandle::Invalid(); + } + virtual void OnDispatch(FiberHandle fiber_handle) = 0; +}; + +} // namespace exe::fibers diff --git a/fibers/scheduler/exe/fibers/core/fiber.cpp b/fibers/scheduler/exe/fibers/core/fiber.cpp new file mode 100644 index 0000000..971aa9f --- /dev/null +++ b/fibers/scheduler/exe/fibers/core/fiber.cpp @@ -0,0 +1,118 @@ +#include +#include +#include + +#include +#include + +namespace exe::fibers { +namespace detail { +void FiberTask::Run() noexcept { + // static_cast(this)->Resume(); + auto fiber_handle = static_cast(this)->Resume(); + if (fiber_handle.IsValid()) { + fiber_handle.Resume(); + } +} + +void FiberTask::Discard() noexcept { + delete static_cast(this); +} +} // namespace detail + +struct YieldAwaiter : IAwaiter { + explicit YieldAwaiter(executors::Hint hint) : hint(hint) { + } + void OnDispatch(FiberHandle fiber_handle) override { + fiber_handle.Schedule(hint); + } + executors::Hint hint; +}; + +////////////////////////////////////////////////////////////////////// + +static twist::util::ThreadLocalPtr fiber; + +Fiber::Fiber(Scheduler& scheduler, Routine co) + : scheduler_(scheduler), + stack_(AllocateStack()), + coroutine_(std::move(co), stack_.View()) { +} + +void Fiber::Schedule(executors::Hint hint) { + scheduler_.Execute(static_cast(this), hint); +} + +void Fiber::Yield(executors::Hint hint) { + YieldAwaiter awaiter(hint); + awaiter_ = &awaiter; + coroutine_.Suspend(); +} + +void Fiber::Step() { + Fiber* tmp = fiber.Exchange(this); + coroutine_.Resume(); + fiber.Exchange(tmp); +} + +FiberHandle Fiber::Dispatch() { + if (!coroutine_.IsCompleted()) { + auto tmp = awaiter_; + awaiter_ = nullptr; + return tmp->TransferTo(FiberHandle(this)); + // + // static_cast(tmp)->OnDispatch(FiberHandle(this)); + // return FiberHandle::Invalid(); + } else { + delete this; + return FiberHandle::Invalid(); + } +} + +Fiber& Fiber::Self() { + return *fiber; +} + +Fiber::~Fiber() { + ReleaseStack(std::move(stack_)); +} + +void Fiber::Suspend(IContinuationStealingAwaiter* awaiter) { + awaiter_ = awaiter; + coroutine_.Suspend(); +} + +FiberHandle Fiber::Resume() { + Step(); + return Dispatch(); +} + +Scheduler& Fiber::GetScheduler() { + return scheduler_; +} + +////////////////////////////////////////////////////////////////////// + +// API Implementation + +void Go(Scheduler& scheduler, Routine routine) { + Fiber* f = new Fiber(scheduler, std::move(routine)); + f->Schedule(); +} + +void Go(Routine routine) { + Go(Fiber::Self().GetScheduler(), std::move(routine)); +} + +namespace self { + +void Yield(executors::Hint hint) { + Fiber::Self().Yield(hint); +} + +void Suspend(exe::fibers::IContinuationStealingAwaiter* awaiter) { + Fiber::Self().Suspend(awaiter); +} +} // namespace self + +} // namespace exe::fibers diff --git a/fibers/scheduler/exe/fibers/core/fiber.hpp b/fibers/scheduler/exe/fibers/core/fiber.hpp new file mode 100644 index 0000000..f4767e8 --- /dev/null +++ b/fibers/scheduler/exe/fibers/core/fiber.hpp @@ -0,0 +1,52 @@ +#pragma once + +#include +#include +#include +#include + +#include // experiment + +enum FiberState { Running, Suspend, Waiting }; + +namespace exe::fibers { + +namespace detail { +struct FiberTask : public executors::TaskBase { + virtual void Run() noexcept override; + virtual void Discard() noexcept override; +}; +} // namespace detail + +// Fiber = Stackful coroutine + Scheduler (Thread pool) + +class Fiber : public detail::FiberTask { + friend detail::FiberTask; + + public: + explicit Fiber(Scheduler& scheduler, Routine co); + + void Schedule(executors::Hint hint = executors::Hint::UpToYou); + + void Yield(executors::Hint hint); + + FiberHandle Dispatch(); + void Suspend(IContinuationStealingAwaiter*); + FiberHandle Resume(); + Scheduler& GetScheduler(); + + static Fiber& Self(); + ~Fiber(); + + private: + // Task + void Step(); + + private: + Scheduler& scheduler_; + context::Stack stack_; + exe::coroutine::CoroutineImpl coroutine_; + IContinuationStealingAwaiter* awaiter_ = nullptr; +}; + +} // namespace exe::fibers diff --git a/fibers/scheduler/exe/fibers/core/handle.cpp b/fibers/scheduler/exe/fibers/core/handle.cpp new file mode 100644 index 0000000..f1ba13a --- /dev/null +++ b/fibers/scheduler/exe/fibers/core/handle.cpp @@ -0,0 +1,27 @@ +#include + +#include + +#include + +#include + +namespace exe::fibers { + +Fiber* FiberHandle::Release() { + WHEELS_ASSERT(fiber_ != nullptr, "Invalid fiber handle"); + return std::exchange(fiber_, nullptr); +} + +void FiberHandle::Schedule(executors::Hint hint) { + Release()->Schedule(hint); +} + +void FiberHandle::Resume() { + auto fh = Release()->Resume(); + while (fh.IsValid()) { + fh = fh.Release()->Resume(); + } +} + +} // namespace exe::fibers diff --git a/fibers/scheduler/exe/fibers/core/handle.hpp b/fibers/scheduler/exe/fibers/core/handle.hpp new file mode 100644 index 0000000..5051ad2 --- /dev/null +++ b/fibers/scheduler/exe/fibers/core/handle.hpp @@ -0,0 +1,41 @@ +#pragma once +#include + +namespace exe::fibers { + +class Fiber; + +// Lightweight non-owning handle to a _suspended_ fiber object + +class FiberHandle { + friend class Fiber; + + public: + FiberHandle() : FiberHandle(nullptr) { + } + + static FiberHandle Invalid() { + return FiberHandle(nullptr); + } + + bool IsValid() const { + return fiber_ != nullptr; + } + + // Schedule to an associated scheduler + void Schedule(executors::Hint hint = executors::Hint::UpToYou); + + // Resume immediately in the current thread + void Resume(); + + private: + explicit FiberHandle(Fiber* fiber) : fiber_(fiber) { + } + + Fiber* Release(); + + private: + Fiber* fiber_; +}; + +} // namespace exe::fibers diff --git a/fibers/scheduler/exe/fibers/core/stacks.cpp b/fibers/scheduler/exe/fibers/core/stacks.cpp new file mode 100644 index 0000000..9eaa381 --- /dev/null +++ b/fibers/scheduler/exe/fibers/core/stacks.cpp @@ -0,0 +1,54 @@ +#include +#include +#include + +using context::Stack; + +namespace exe::fibers { + +////////////////////////////////////////////////////////////////////// + +class StackAllocator { + public: + Stack Allocate() { + { + std::lock_guard guard(guard_stacks_); + if (!stacks_.empty()) { + auto stack = std::move(stacks_.back()); + stacks_.pop_back(); + return stack; + } + } + + return AllocateNewStack(); + } + + void Release(Stack stack) { + std::lock_guard guard(guard_stacks_); + stacks_.push_back(std::move(stack)); + } + + private: + static Stack AllocateNewStack() { + static const size_t kStackPages = 16; // 16 * 4KB = 64KB + return Stack::AllocatePages(kStackPages); + } + + private: + std::deque stacks_; + support::SpinLock guard_stacks_; +}; + +////////////////////////////////////////////////////////////////////// + +StackAllocator allocator; + +context::Stack AllocateStack() { + return allocator.Allocate(); +} + +void ReleaseStack(context::Stack stack) { + allocator.Release(std::move(stack)); +} + +} // namespace exe::fibers diff --git a/fibers/scheduler/exe/fibers/core/stacks.hpp b/fibers/scheduler/exe/fibers/core/stacks.hpp new file mode 100644 index 0000000..f931f1a --- /dev/null +++ b/fibers/scheduler/exe/fibers/core/stacks.hpp @@ -0,0 +1,10 @@ +#pragma once + +#include + +namespace exe::fibers { + +context::Stack AllocateStack(); +void ReleaseStack(context::Stack stack); + +} // namespace exe::fibers diff --git a/fibers/scheduler/exe/fibers/sync/condvar.hpp b/fibers/scheduler/exe/fibers/sync/condvar.hpp new file mode 100644 index 0000000..0cd5918 --- /dev/null +++ b/fibers/scheduler/exe/fibers/sync/condvar.hpp @@ -0,0 +1,41 @@ +#pragma once + +#include +#include + +#include + +// std::unique_lock +#include + +namespace exe::fibers { + +class CondVar { + using Lock = std::unique_lock; + + public: + void Wait(Lock& lock) { + uint32_t old = count_notifies_.load(); + lock.unlock(); + + notifier_.ParkIfEqual(old); + + lock.lock(); + } + + void NotifyOne() { + count_notifies_.fetch_add(1); + notifier_.WakeOne(); + } + + void NotifyAll() { + count_notifies_.fetch_add(1); + notifier_.WakeAll(); + } + + private: + twist::stdlike::atomic count_notifies_{0}; + FutexLike notifier_{count_notifies_}; +}; + +} // namespace exe::fibers diff --git a/fibers/scheduler/exe/fibers/sync/futex.hpp b/fibers/scheduler/exe/fibers/sync/futex.hpp new file mode 100644 index 0000000..2b53b9c --- /dev/null +++ b/fibers/scheduler/exe/fibers/sync/futex.hpp @@ -0,0 +1,103 @@ +#pragma once + +#include + +#include + +#include +#include +#include + +#include + +#include +#include + +namespace exe::fibers { + +template +concept Unlockable = requires(T x) { + x.Unlock(); +}; + +template +class FutexAwaiter : public IAwaiter, + public wheels::IntrusiveForwardListNode> { + public: + explicit FutexAwaiter(T& value) : value_(value) { + } + + void OnDispatch(FiberHandle fiber_handle) override { + fiber_handle_ = fiber_handle; + value_.Unlock(); + } + + void Schedule() { + fiber_handle_.Schedule(); + } + + private: + T& value_; + FiberHandle fiber_handle_; +}; + +template +class FutexLike { + public: + explicit FutexLike(twist::stdlike::atomic& cell) : cell_(cell) { + } + + ~FutexLike() { + if (!sleeping_queue_.IsEmpty()) { + assert(false); + } + } + + // Park current fiber if cell.load() == `old` + void ParkIfEqual(T old) { + guard_sleep_queue_.lock(); + if (cell_.load() != old) { + guard_sleep_queue_.unlock(); + return; + } + auto awaiter = FutexAwaiter(guard_sleep_queue_); + + sleeping_queue_.PushBack(&awaiter); + + self::Suspend(&awaiter); + } + + void WakeOne() { + std::lock_guard g(guard_sleep_queue_); + + if (sleeping_queue_.IsEmpty()) { + return; + } + + sleeping_queue_.PopFront()->Schedule(); + } + + void WakeAll() { + std::unique_lock lock(guard_sleep_queue_); + + if (sleeping_queue_.IsEmpty()) { + return; + } + + auto wakeup_queue = std::move(sleeping_queue_); + + lock.unlock(); + + while (!wakeup_queue.IsEmpty()) { + wakeup_queue.PopFront()->Schedule(); + } + } + + private: + exe::support::SpinLock guard_sleep_queue_; + twist::stdlike::atomic& cell_; + + wheels::IntrusiveForwardList> sleeping_queue_; +}; + +} // namespace exe::fibers \ No newline at end of file diff --git a/fibers/scheduler/exe/fibers/sync/mutex.hpp b/fibers/scheduler/exe/fibers/sync/mutex.hpp new file mode 100644 index 0000000..36b609c --- /dev/null +++ b/fibers/scheduler/exe/fibers/sync/mutex.hpp @@ -0,0 +1,57 @@ +#pragma once + +#include + +#include +#include + +namespace exe::fibers { + +class Mutex { + struct Node : public detail::mutex::IntrusiveNode { + FiberHandle fiber_handle; + }; + + struct MutexAwaiter : public IAwaiter { + explicit MutexAwaiter(Mutex& mutex) : mutex(mutex) { + } + + void OnDispatch(FiberHandle fiber_handle) override { + node.fiber_handle = fiber_handle; + if (mutex.awaiters_.TryLockOrEnqueue(&node)) { + node.fiber_handle.Schedule(executors::Hint::Next); + } + } + + Mutex& mutex; + Node node; + }; + + public: + void Lock() { + if (awaiters_.TryLock()) { + return; + } + MutexAwaiter mutex_awaiter(*this); + fibers::self::Suspend(&mutex_awaiter); + } + + void Unlock() { + auto node = awaiters_.TryPopOrUnlock(); + if (node != nullptr) { + auto awaiter = fibers::ContinuationStealingAwaiter(node->fiber_handle); + fibers::self::Suspend(&awaiter); + } + } + + void lock() { // NOLINT + Lock(); + } + + void unlock() { // NOLINT + Unlock(); + } + + detail::mutex::LockfreeIntrusiveQueue awaiters_; +}; +} // namespace exe::fibers diff --git a/fibers/scheduler/exe/fibers/sync/wait_group.hpp b/fibers/scheduler/exe/fibers/sync/wait_group.hpp new file mode 100644 index 0000000..691ed1c --- /dev/null +++ b/fibers/scheduler/exe/fibers/sync/wait_group.hpp @@ -0,0 +1,57 @@ +#pragma once + +#include +#include + +namespace exe::fibers { + +// https://gobyexample.com/waitgroups + +namespace detail { +class WaitGroupParking { + public: + void Notify() { + done_.store(1); + notifier_.WakeAll(); + done_.store(2); + } + + void Wait() { + while (done_.load() == 0) { + notifier_.ParkIfEqual(0); + } + + while (done_.load() == 1) { + self::Yield(); + } + } + + private: + // 0 - wait done, 1 - wait end notify, 2 - released + twist::stdlike::atomic done_{0}; + FutexLike notifier_{done_}; +}; +} // namespace detail + +class WaitGroup { + public: + void Add(size_t count) { + counter_.fetch_add(count); + } + + void Done() { + if (counter_.fetch_sub(1) == 1) { + releaser_.Notify(); + } + } + + void Wait() { + releaser_.Wait(); + } + + private: + twist::stdlike::atomic counter_{0}; + detail::WaitGroupParking releaser_; +}; + +} // namespace exe::fibers diff --git a/fibers/scheduler/exe/support/lockfree_mutex_queue.hpp b/fibers/scheduler/exe/support/lockfree_mutex_queue.hpp new file mode 100644 index 0000000..ea3d375 --- /dev/null +++ b/fibers/scheduler/exe/support/lockfree_mutex_queue.hpp @@ -0,0 +1,108 @@ +#pragma once + +#include +#include + +#include + +namespace exe::fibers::detail::mutex { + +struct IntrusiveNode { + uintptr_t next = 0; +}; + +template +class LockfreeIntrusiveQueue { + using Node = IntrusiveNode; + + enum State : uintptr_t { kUnlock = 1, kLock = 0 }; + + public: + // if tail == kUnlock then try cas to kLock + // else try push + // return true if successful lock + bool TryLockOrEnqueue(Node* new_node) { + new_node->next = tail_.load(); + + while (true) { + if (new_node->next == State::kUnlock) { + if (tail_.compare_exchange_weak(new_node->next, State::kLock)) { + return true; + } + } else { + if (tail_.compare_exchange_weak(new_node->next, ToUIntPtr(new_node))) { + return false; + } + } + } + } + + T* TryPopOrUnlock() { + if (output_ == nullptr) { + auto output = GrabOrUnlock(); + if (output == nullptr) { + return nullptr; + } + output_ = output; + } + + return PopFromOutput(); + } + + T* PopFromOutput() { + return static_cast(std::exchange(output_, ToNode(output_->next))); + } + + bool TryLock() { + auto old = tail_.load(); + if (old == State::kUnlock) { + return tail_.compare_exchange_weak(old, State::kLock); + } + return false; + } + + Node* GrabOrUnlock() { + uintptr_t old = tail_.load(); + + while (true) { + if (old == State::kLock) { + if (tail_.compare_exchange_weak(old, State::kUnlock)) { + return nullptr; + } + } else { + return ReverseStack(tail_.exchange(State::kLock)); + } + } + } + + private: + static uintptr_t ToUIntPtr(Node* node) { + return reinterpret_cast(node); + } + + static Node* ToNode(uintptr_t ptr) { + return reinterpret_cast(ptr); + } + + static Node* ReverseStack(uintptr_t ptr) { + if (ptr == State::kUnlock || ptr == State::kLock) { + WHEELS_PANIC("reverse stack"); + } + uintptr_t prev = ptr; + ptr = ToNode(ptr)->next; + ToNode(prev)->next = State::kLock; + + while (ptr != State::kLock) { + uintptr_t next = ToNode(ptr)->next; + ToNode(ptr)->next = prev; + prev = ptr; + ptr = next; + } + + return ToNode(prev); + } + + twist::stdlike::atomic tail_{State::kUnlock}; + Node* output_ = nullptr; +}; +} // namespace exe::fibers::detail::mutex \ No newline at end of file diff --git a/fibers/scheduler/exe/support/spinlock.hpp b/fibers/scheduler/exe/support/spinlock.hpp new file mode 100644 index 0000000..9cbe9f2 --- /dev/null +++ b/fibers/scheduler/exe/support/spinlock.hpp @@ -0,0 +1,41 @@ +#pragma once + +#include +#include +#include +#include + +namespace exe::support { + +// Test-and-TAS spinlock + +class SpinLock { + public: + void Lock() { + while (is_locked_.exchange(true, std::memory_order::acquire)) { + twist::util::SpinWait s; + while (is_locked_.load(std::memory_order::relaxed)) { + s.Spin(); + } + } + } + + void Unlock() { + is_locked_.store(false, std::memory_order::release); + } + + // BasicLockable + + void lock() { // NOLINT + Lock(); + } + + void unlock() { // NOLINT + Unlock(); + } + + private: + twist::stdlike::atomic is_locked_{false}; +}; + +} // namespace exe::support diff --git a/fibers/scheduler/exe/support/wait_group.hpp b/fibers/scheduler/exe/support/wait_group.hpp new file mode 100644 index 0000000..e03ed93 --- /dev/null +++ b/fibers/scheduler/exe/support/wait_group.hpp @@ -0,0 +1,36 @@ +#pragma once + +#include +#include +#include + +namespace exe::support { +class WaitGroup { + public: + explicit WaitGroup(uint32_t count = 0) : counter_(count) { + } + + void Add() { + zero_notifier_.store(0); + counter_.fetch_add(1); + } + + void Done() { + if (counter_.fetch_sub(1) == 1) { + zero_notifier_.fetch_add(1); + zero_notifier_.notify_all(); + } + } + + void Wait() { + uint32_t old_zero_notifier = 0; + while (counter_.load() != 0) { + zero_notifier_.wait(old_zero_notifier); + } + } + + private: + twist::stdlike::atomic counter_{0}; + twist::stdlike::atomic zero_notifier_{0}; +}; +} // namespace exe::support \ No newline at end of file