Hi everybody,
recently on #Discord a question came up, howto handle/use guards inside lifetime hooks. I'm working on a AtomicSharedPtr[T] and struggle with the handling of atomic member(s) inside lifetime hooks. Example :
type
Ctrl[T] = object ## the control-block for a value of T and its global reference-count.
val :T
rc :Atomic[int]
TagPtr = object ## this shared-pointer holds a local reference-count in the upper 16-bits of `ptr Ctrl[T]`.
pt {.bitsize :48.} :int ## the remaining 48-bit for pointer-info (amd64-only).
lc {.bitsize :16.} :uint16 ## the local reference-count -> how many threads currently access the ctrl-block.
AtomicSharedPtr*[T] = object
ctrlBlock* :Atomic[ptr Ctrl[T]] ## to read this atomic-ptr, it has to be mutable.
ASP[T] = object ## a utility to *cast-away* the memory-ordering.
ctrlBlock :ptr Ctrl[T]
proc `=copy`*[T](dest :var AtomicSharedPtr[T], src :AtomicSharedPtr[T]) =
var tmp = cast[ptr ASP[T]](src.addr) # cast to ASP-utility to read the pointer (non-atomically).
if tmp.ctrlBlock != nil :
dest.ctrlBlock.store tmp.ctrlBlock # use the ctrl-block with destination.
discard dest.incRef
else :
`=destroy`(dest)
The cast eg. in the =copy- or =dup-hooks work, but somehow feel a bit wrong ? No idea if such casting away the atomicity has more subtle consequences ? Maybe it should be done differently. This code is based on threading/smartptrs. The idea behind this lockless shared-pointer is called 'split-reference count'. Facebooks folly-lib uses it in production. Daniel Anderson describes the inner-workings here "Lock-free Atomic Shared Pointers Without a Split Reference Count? It Can Be Done! " CppCon24.Alternatively, is there a reason why ctrlBlock*: Atomic[ptr Ctrl[T]] has to be atomic?
I used the following modelling here which doesn't cast: https://github.com/mratsim/weave/blob/v0.4.10/weave/cross_thread_com/flow_events.nim
type
FlowEvent* = object
e: EventPtr
EventKind = enum
Single
Iteration
EventIter = object
## Payload
SingleEvent = object
## Payload
EventPtr = ptr object
refCount: Atomic[int32]
kind: EventKind
union: EventUnion
proc `=destroy`*(event: var FlowEvent) =
if event.e.isNil:
return
let count = event.e.refCount.load(moRelaxed)
fence(moAcquire)
if count == 0:
# We have the last reference
if not event.e.isNil:
if event.e.kind == Iteration:
wv_free(event.e.union.iter.singles)
# Return memory to the memory pool
recycle(event.e)
else:
discard fetchSub(event.e.refCount, 1, moRelease)
event.e = nil
proc `=sink`*(dst: var FlowEvent, src: FlowEvent) {.inline.} =
# Don't pay for atomic refcounting when compiler can prove there is no ref change
`=destroy`(dst)
system.`=sink`(dst.e, src.e)
proc `=`*(dst: var FlowEvent, src: FlowEvent) {.inline.} =
`=destroy`(dst)
discard fetchAdd(src.e.refCount, 1, moRelaxed)
dst.e = src.e
Note that = should be =copy nowadays.
I don't see you using the TagPtr yet but you might be interested in how I do it there, note that bitfields/bitsize doesn't work if you need atomic updates, you need to use masked fetchAdd/fetchSub, btw I think your types are wrong:
Example 1 where I map a bitfield to an atomic (but not a concurrent count, there can only be 2 readers-writers) https://github.com/mratsim/constantine/blob/v0.2.0/constantine/threadpool/crossthread/tasks_flowvars.nim#L31-L46
type
TaskState = object
## This state allows synchronization between:
## - a waiter that may sleep if no work and task is incomplete
## - a thief that completes the task
## - a waiter that frees task memory
## - a waiter that will pick up the task continuation
##
## Supports up to 2¹⁵ = 32768 threads
completed: Futex
synchro: Atomic[uint32]
# type synchro = object
# canBeFreed {.bitsize: 1.}: uint32 - Transfer ownership from thief to waiter
# pad {.bitsize: 1.}: uint32
# waiterID {.bitsize: 15.}: uint32 - ID of the waiter blocked on the task completion.
# thiefID {.bitsize: 15.}: uint32 - ID of the worker that stole and run the task. For leapfrogging.
# Tasks have the following lifecycle:
# - A task creator that schedule a task on its queue
# - A task runner, task creator or thief, that runs the task
# - Once the task is finished:
# - if the task has no future, the task runner frees the task
# - if the task has a future,
# - the task runner can immediately pick up new work
# - the awaiting thread frees the task
# - the awaiting thread might be sleeping and need to be woken up.
#
# There is a delicate dance as we are need to prevent 2 issues:
#
# 1. A deadlock: if the waiter is never woken up after the thief completes the task
# 2. A use-after-free: if the thief tries to access the task after the waiter frees it.
#
# To solve 1, we need to set a `completed` flag, then check again if the waiter parked before.
# To solve 2, we either need to ensure that after the `completed` flag is set, the task runner
# doesn't access the task anymore which is impossible due to 1;
# or we have the waiter spinlock on another flag `canBeFreed`.
const # bitfield setup
kCanBeFreedShift = 31
kCanBeFreed = 1'u32 shl kCanBeFreedShift
kCanBeFreedMask = kCanBeFreed # 0x80000000
kWaiterShift = 15
kThiefMask = (1'u32 shl kWaiterShift) - 1 # 0x00007FFF
kWaiterMask = kThiefMask shl kWaiterShift # 0x3FFF8000
SentinelWaiter = high(uint32) and kWaiterMask
SentinelThief* = high(uint32) and kThiefMask
proc initSynchroState*(task: ptr Task) {.inline.} =
task.state.completed.store(0, moRelaxed)
task.state.synchro.store(SentinelWaiter or SentinelThief, moRelaxed)
# Flowvar synchronization
# -----------------------
proc isGcReady*(task: ptr Task): bool {.inline.} =
## Check if task can be freed by the waiter if it was stolen
(task.state.synchro.load(moAcquire) and kCanBeFreedMask) != 0
proc setGcReady*(task: ptr Task) {.inline.} =
## Thief transfers full task ownership to waiter
discard task.state.synchro.fetchAdd(kCanBeFreed, moRelease)
proc isCompleted*(task: ptr Task): bool {.inline.} =
## Check task completion
task.state.completed.load(moAcquire) != 0
proc setCompleted*(task: ptr Task) {.inline.} =
## Set a task to `complete`
## Wake a waiter thread if there is one
task.state.completed.store(1, moRelease) # Correctness on weak memory models like ARM: tests/t_ethereum_eip4844_deneb_kzg_parallel.nim
fence(moSequentiallyConsistent) # Avoid deadlock on Windows: benchmarks-threadpool/fibonacci/threadpool_fib.nim
let waiter = task.state.synchro.load(moRelaxed)
if (waiter and kWaiterMask) != SentinelWaiter:
task.state.completed.wake()
proc sleepUntilComplete*(task: ptr Task, waiterID: int32) {.inline.} =
## Sleep while waiting for task completion
let waiter = (cast[uint32](waiterID) shl kWaiterShift) - SentinelWaiter
discard task.state.synchro.fetchAdd(waiter, moRelaxed)
fence(moAcquire)
while task.state.completed.load(moRelaxed) == 0:
task.state.completed.wait(0)
# Leapfrogging synchronization
# ----------------------------
proc getThief*(task: ptr Task): uint32 {.inline.} =
task.state.synchro.load(moAcquire) and kThiefMask
proc setThief*(task: ptr Task, thiefID: int32) {.inline.} =
let thief = cast[uint32](thiefID) - SentinelThief
discard task.state.synchro.fetchAdd(thief, moRelease)
Example 2 with 2 incremented/decremented counts: https://github.com/mratsim/constantine/blob/v0.2.0/constantine/threadpool/crossthread/backoff.nim#L21-L68
type
Eventcount* = object
## The lock-free equivalent of a condition variable.
##
## Usage, if a thread needs to be parked until a condition is true
## and signaled by another thread:
##
Nim
if condition:
return
## ## while true: ## ticket = ec.sleepy() ## if condition: ## ec.cancelSleep() ## break ## else: ## ec.sleep() ##
waitset: Atomic[uint32] # type waitset = object # preSleep {.bitsize: 16.}: uint32 # committedSleep {.bitsize: 16.}: uint32 # # We need precise committed sleep count for the `syncAll` barrier because a `preSleep` waiter # may steal a task and create more work. events: Futex ParkingTicket* = object epoch: uint32 const # bitfield setup # Low 16 bits are waiters, up to 2¹⁶ = 65536 threads are supported # Next 16 bits are pre-waiters, planning to wait but not committed. # # OS limitations: # - Windows 10 supports up to 256 cores (https://www.microsoft.com/en-us/microsoft-365/blog/2017/12/15/windows-10-pro-workstations-power-advanced-workloads/) # - Linux CPUSET supports up to 1024 threads (https://man7.org/linux/man-pages/man3/CPU_SET.3.html) # # Hardware limitations: # - Xeon Platinum 8490H, 60C/120T per socket # - up to 8 sockets: 960 threads kPreWaitShift = 8'u32 kPreWait = 1'u32 shl kPreWaitShift kWait = 1'u32 kCommitToWait = kWait - kPreWait kWaitMask = kPreWait-1 kPreWaitMask = not kWaitMask
yep, thankfully the shared ptr Ctrl[T] can be mutated via dest - otherwise its a infinite-loop (stopped after 2000 iterations.). The dest.incRef performs the mutation (for both src & dest).
So i understand a =destroy[T](obj :var T) will be allowed in the future - just because i noticed the compiler giving a deprecation warning ?
So i understand a =destroyT will be allowed in the future
When I developed the library var T worked and plain T didn't compile. I didn't retry to switch to plain T since then.
So i understand a =destroyT will be allowed in the future - just because i noticed the compiler giving a deprecation warning ( since 2.2.0 ) - i'm on devel ?
There are no plans to allow it in the future unless some form of locking requires it that I didn't consider. So far I found no case where it's required.
There are no plans to allow it in the future unless some form of locking requires it that I didn't consider. So far I found no case where it's required.
ic, well you might take a look at @mratsim event-destructor from above :
proc `=destroy`*(event: var FlowEvent) =
if event.e.isNil:
return
let count = event.e.refCount.load(moRelaxed)
fence(moAcquire)
...
Such needs it and basically anything lockless when atomics are involved. So maybe some exceptions might be a nice idea ?8=) Whats the big gain for the compiler-side-of-things to get rid of mutabillty inside destructors ? On my side, i'd have many types that need it. The atomic SharedPtr's in C++ 2020 and MSVC use locks, which would require mutabillity inside lifetime-hooks - that was the case for @planetis on discord, AFAIK.These can all be done like so:
type
MyObject {.byref.} = object # lock is attached to a particular stack location so never use pass-by-copy
x: Lock
proc `=destroy`(x: MyObject) =
let v = addr x
atomicStore v.refcount, 0
However, by this logic we never need var T parameters anyway. So your point is valid and the whole thing would be much less hacky with var T destructors...
Alternatively, is there a reason why ctrlBlock*: Atomic[ptr Ctrl[T]] has to be atomic?
yes. The implementations in C++/2020 and MSVC use locks. The 'split reference count' uses the upper 16-bits for a local-reference count. All incRef/decRef-operations happen inside a CAS-loop. As you mentioned and already done in weave - thx for opening up your treasure-trove once again, esp. for the Futex-implementations - atomic bitops or fetchAdd would be more efficient, but not possible in the atomic shared-pointer case. The tricky stuff happens when a threads CAS has failed, it can detect the reason of the failure. Either some other thread changed the local-refcount (-> retry) OR the ctrlBlock-pointer itself has changed. The operations are more expensive, but IMHO deliver on the promise that evrbd. believed the Shared-Pointer of C++ 2011 would bring to the table. Until we learned the atomic ref-counting only protects the ref-counts and not the referenced T. The atomic Shared-Pointer[T] is designed do exactly that and does not require a additional safe-memory-reclamation schema. So i assume, one can do a efficient concurrent HashSet with Nim included batteries -> heap-allocated ctrl-blocks and ARC/ORC-lifetime hooks. No hazard-pointers, EBR or what ever required. I think this is quite attractive. Sure, traversals on a Hash-Set will be faster with RCU/EBR or alike, but a concurrent-table should be very fast and easy to implement. This combination might be a poor-mans concurrent-ref-counting with defered reclamation - i like it. I looked at FRC, which @ARAQ had suggested in a earlier forum-thread, to maybe a replace ARC/ORC in the future. I must admit - so far i failed on FRC :) - but its sadly abandoned by the trio of authors and has not been further discussed in the recent literature. Daniel Anderson mentions FRC, but did not include it into his competitive evaluations for technical reasons.
So to sum it up. The combination of ARC/ORC plus pools and some SMR-schema seems to work well in weave and in my stuff, too. A lockfree atomic shared-pointer could finally satisfy the expectations that one can do , eg. a concurrent Stack or Table without SMR or a explicit concurrent reference-counter. One more option that works on any platform that support atomics AND can provide the safety guarantees of hazard-pointers - as its robust. Whereas EBR is not. So leave it to the science-folks to find the best of both RCU and Hazard-pointers while Nim already can already offer exactly that. If you need robustness and safety use a atomic shared-pointer otherwise combine Pools and ARC/ORC.
Concurrent ref-counting might nice to have, but will performance-wise always stay behind RCU (EBR and friends).
I used the following modelling here which doesn't cast: > https://github.com/mratsim/weave/blob/v0.4.10/weave/cross_thread_com/flow_events.nim
When I developed the library var T worked and plain T didn't compile. I didn't retry to switch to plain T since then.
you'd see a warning with recent Nim. But as Araq pointed out it can be easily worked around using {.byref.}- here i learned smth. new thx ARAQ.
What about the plan to integrate Weave into Nim ? My impression is, that many good things would come to Nim - Futexes, Concurrent recycling Allocators just to name a few i've seen.
Recently i noticed that you supported Eliah by doing a code-review on his work nimlocklessqueues-module. I used the SPSC-queue to compare it with the Frasch-queue (by its author Charles Frasch presented at CppCon 2023). It combines ideas from Rigtorp and Vyukov. On my machine the SPSC from nim-locklessqueues peaks at 60 million operations/sec. The frasch-queue performed 4-7x better, up-to 350 million-ops/sec. I have not yet published it, but its just 70-LOCs. If you need a SPSC in weave i'd paste it somewhere.
greets Andreas
What about the plan to integrate Weave into Nim ?
The default theadpool in Nim should be able to handle IO, in particular blocking calls waiting for sockets, or tasks submitted from outside the threadpool.
A better base would be Constantine's threadpool which I isolated in https://github.com/mratsim/weave-io.
The main differences are listed here: https://github.com/mratsim/constantine/blob/v0.2.0/constantine/threadpool/README.md
This implements a lightweight, energy-efficient, easily auditable multithreaded threadpool.
This threadpool will desirable properties are:
- Ease of auditing and maintenance.
- Resource-efficient. Threads spindown to save power, low memory use.
- Decent performance and scalability. The CPU should spent its time processing user workloads and not dealing with threadpool contention, latencies and overheads.
Compared to Weave, here are the tradeoffs:
- Constantine's threadpool provides spawn/sync (task parallelism) and optimized parallelFor for (data parallelism). It however does not provide precise in/out dependencies (events / dataflow parallelism).
- Constantine's threadpool has been significantly optimized to provide overhead lower than Weave's default (and as low as Weave "lazy" + "alloca" allocation scheme).
Compared to nim-taskpools, here are the tradeoffs:
- Constantine does not use std/tasks:
- No external dependencies at runtime (apart from compilers, OS and drivers)
- We can replace Task with an intrusive linked list
- Furthermore we can embed tasks in their future
- Hence allocation/scheduler overhead is 3x less than nim-taskpools as we fuse the following allocations:
- Task
- The linked list of tasks
- The future (Flowvar) result channel
- Contention improvement, Constantine is entirely lock-free while Nim-taskpools need a lock+condition variable for putting threads to sleep
- Powersaving improvement, threads sleep when awaiting for a task and there is no work available.
- Scheduling improvement, Constantine's threadpool incorporate Weave's adaptative scheduling policy with additional enhancement (leapfrogging)
And a full design writeup in: https://github.com/mratsim/constantine/blob/v0.2.0/docs/threadpool-design.md
A couple more things:
One day, when I have the time, I'll make the necessary changes to Weave-io but PR accepted and I can provided guidance as well.