executor/fibers/scheduler/exe/support/lockfree_mutex_queue.hpp

108 lines
2.3 KiB
C++
Raw Permalink Normal View History

2024-04-22 15:02:31 +00:00
#pragma once
#include <cstdint>
#include <vector>
#include <twist/stdlike/atomic.hpp>
namespace exe::fibers::detail::mutex {
struct IntrusiveNode {
uintptr_t next = 0;
};
template <typename T>
class LockfreeIntrusiveQueue {
using Node = IntrusiveNode;
enum State : uintptr_t { kUnlock = 1, kLock = 0 };
public:
// if tail == kUnlock then try cas to kLock
// else try push
// return true if successful lock
bool TryLockOrEnqueue(Node* new_node) {
new_node->next = tail_.load();
while (true) {
if (new_node->next == State::kUnlock) {
if (tail_.compare_exchange_weak(new_node->next, State::kLock)) {
return true;
}
} else {
if (tail_.compare_exchange_weak(new_node->next, ToUIntPtr(new_node))) {
return false;
}
}
}
}
T* TryPopOrUnlock() {
if (output_ == nullptr) {
auto output = GrabOrUnlock();
if (output == nullptr) {
return nullptr;
}
output_ = output;
}
return PopFromOutput();
}
T* PopFromOutput() {
return static_cast<T*>(std::exchange(output_, ToNode(output_->next)));
}
bool TryLock() {
auto old = tail_.load();
if (old == State::kUnlock) {
return tail_.compare_exchange_weak(old, State::kLock);
}
return false;
}
Node* GrabOrUnlock() {
uintptr_t old = tail_.load();
while (true) {
if (old == State::kLock) {
if (tail_.compare_exchange_weak(old, State::kUnlock)) {
return nullptr;
}
} else {
return ReverseStack(tail_.exchange(State::kLock));
}
}
}
private:
static uintptr_t ToUIntPtr(Node* node) {
return reinterpret_cast<uintptr_t>(node);
}
static Node* ToNode(uintptr_t ptr) {
return reinterpret_cast<Node*>(ptr);
}
static Node* ReverseStack(uintptr_t ptr) {
if (ptr == State::kUnlock || ptr == State::kLock) {
WHEELS_PANIC("reverse stack");
}
uintptr_t prev = ptr;
ptr = ToNode(ptr)->next;
ToNode(prev)->next = State::kLock;
while (ptr != State::kLock) {
uintptr_t next = ToNode(ptr)->next;
ToNode(ptr)->next = prev;
prev = ptr;
ptr = next;
}
return ToNode(prev);
}
twist::stdlike::atomic<uintptr_t> tail_{State::kUnlock};
Node* output_ = nullptr;
};
} // namespace exe::fibers::detail::mutex