As I mentioned previously I am working on a library to declare threads as server with specific handlers that you can send messages to and that execute user-defined handler procs based on the message-type it receives.
I wanted to allow users to define async handlers. While trying to get that to work and taking special care with sanitizers, helgrind and memcheck that everything is safe as it can be as multithreading is a dangerous beast, I noticed that asyncCheck causes memory leaks for me.
A minimal artificial example:
import std/asyncdispatch
var count: int
proc stuff() {.async.} =
echo count, 1
await sleepAsync(500)
echo count, 2
count.inc
for _ in 0..<10:
discard stuff()
while hasPendingOperations(): poll()
doAssert count == 10
This will lead to memory leaks as visible when compiling with e.g. address sanitizer:
nim r --panics:on --define:danger --mm:arc --cc:clang --debugger:native --threads:on -d:butlerThreading -d:chronicles_enabled=off -d:useMalloc --debuginfo:on --linedir:on --passC:'-fno-omit-frame-pointer' --passC:'-mno-omit-leaf-frame-pointer' --passC:"-fsanitize=address" --passL:"-fsanitize=address" src/playground.nim
The code above will not leak memory if you just discard the future.
The problem here appears to be a cycle. I don't fully understand that myself, but leorize tried to explain it to me:
The asyncCheck callback appears to construct a closure-callback, which holds on to a ref to the future being constructed by stuff(). The callback is then placed into the future itself (?), causing a cycle that arc can no longer deal with.
This is problematic for me, as I do not want to force users to use orc when they just want/need arc. Only constrain users where you need to and all that.
Now what can I do to manually (in the short-term) to solve that problem for now (besides opening a github issue, which I plan on doing)?
In case anyone is curious, the above is derived from my more realistic threadServer code. I have my own event-loop that runs on said thread-server, which polls its channel for messages and executes handlers with said message if there is one. Those handlers may be async, thus causing problems.
proc routeMessage*(msg: sink ServerMessage; hub: ChannelHub) =
case msg.kind
of RequestKind:
asyncCheck(handleRequestOnServer(msg.requestMsg, hub)) # Just some async handler proc
of KillServerKind:
shutdownServer()
proc runServerLoop[Msg](data: Server[Msg]) {.gcsafe.} =
mixin routeMessage
while keepRunning():
var msg: Option[Msg] = data.hub.readMsg(Msg)
try:
while msg.isSome():
{.gcsafe.}:
routeMessage(msg.get(), data.hub) # May cause async operations to be pushed onto the global dispatcher
msg = data.hub.readMsg(Msg)
except KillError:
break
except CatchableError as e:
error "Message caused exception", msg = msg.get()[], error = e.repr
if hasPendingOperations():
drain(data.sleepMs)
else:
sleep(data.sleepMs)
I would love to, but my core design so far does not look like it is even theoretically possible with chronos.
My server-loop (see the last code example) is fundamentally relying on being able to poll a channel for message as well as calling poll when async work is available.
Chronos does not offer this. Chronos has an eternally ongoing poll call and no equivalent to hasPendingOperations().
If I could've, I would've jumped, but it appears to be impossible from my point of view to write custom event-loops with it.
asyncfutures.nim:
proc addCallback*[T](future: Future[T],
cb: proc (future: Future[T]) {.closure, gcsafe.}) =
## Adds the callbacks proc to be called when the future completes.
##
## If future has already completed then `cb` will be called immediately.
future.addCallback(
proc() =
cb(future)
)
proc `callback=`*(future: FutureBase, cb: proc () {.closure, gcsafe.}) =
## Clears the list of callbacks and sets the callback proc to be called when the future completes.
##
## If future has already completed then `cb` will be called immediately.
##
## It's recommended to use `addCallback` or `then` instead.
future.clearCallbacks
future.addCallback cb
Is one offender as the variant cb: proc (future: Future[T]) is mapped to the worse version that lacks the future parameter and thus creates an unnecessary closure. This is one source of the cycles IIRC and it's relatively easy to fix, the compiler's error messages will guide you.
It might break compat but then so be it, it's worth it. We can at least offer some switch to keep code working.
Chronos has an eternally ongoing poll call that never times out
chronos poll returns after it has processed one round of callbacks - thus, you can use a timer to make it return on a regular basis like so (waitFor calls poll):
import chronos
proc work(v: int) {.async.} =
echo "before ", v
await sleepAsync(1.seconds)
echo "after ", v
var v = 0
while true:
waitFor sleepAsync(500.milliseconds)
echo "Starting task"
asyncSpawn work(v)
inc v
Is one offender
this one doesn't really matter greatly - the worst problem in asyncdispatch is that every async call creates a cyclic reference between the future and the closure environment of the iterator whose body is generated by the async macro - one needs a fair bit of rewriting to eliminate this.
The way the current setup works it must be possible to have while-loop iterations that don't spawn more async work, as well as while-loop iterations that don't do (and don't wait for) async work. For that I lack the creativity on how it could work with chronos, as it requires a hasPendingOperations()-like proc (from what I can imagine so far) which chronos does not have.
I'm not entirely sure what you want to achieve to be honest as your two examples point in slightly different directions:
Here's a way to structure the code so the channel is checked for messages every 10 mills no matter of there's ongoing async work or not:
proc readWork() =
var msg: Option[Msg] = data.hub.readMsg(Msg)
while msg.isSome():
asyncSpawn processWork(msg)
msg = data.hub.readMsg(Msg)
const pollInterval = 10.millis
while keepGoing():
waitFor sleepAsync(pollInterval)
readWork()
If you don't want the message queue to be checked while there is async work ongoing, the code becomes:
proc readWork(): seq[Future[void]] =
var msg: Option[Msg] = data.hub.readMsg(Msg)
while msg.isSome():
result.add processWork(msg)
msg = data.hub.readMsg(Msg)
while keepGoing():
let work = readWork()
if work.len > 0:
for w in work: waitFor w
else:
waitFor sleepAsync(pollInterval)
The hasPendingOperations approach, as well all above examples, are fundamentally flawed though, as is the idea to use sleep in general. This is part of the reason why chronos doesn't have a hasPendingOperations function: it promotes a poor pattern of execution that has many gotchas and flaws in general and if you're reaching for it something probably went wrong before you got to that point.
Every time you introduce a sleep or other forms of polling to make things work, that's likely an inefficiency and the only time you would do that is when you don't have a choice (because you don't control the source channel for example or its notification mechanism is not adapted to the event loop).
The way this is solved in efficient systems is that every message queue / channel comes with a signalling mechanism to tell the event loop to wake up because there is a new message to process.
In chronos, this is AsyncEvent or ThreadSignal depending on whether it's a multithreaded scenario or not (the latter works for both) - basically, every time you put a message on the queue, you fire the event so the event loop wakes up:
var signal = ThreadSignalPtr.new()[]
proc addMessage(m: Msg) =
data.hub.add m
# Tell the loop there is new data
signal.fireSync()
proc readWork() =
var msg: Option[Msg] = data.hub.readMsg(Msg)
while msg.isSome():
asyncSpawn processWork(msg)
msg = data.hub.readMsg(Msg)
while keepGoing():
waitFor signal.wait()
readWork()
The above code is optimal in that there is no polling so the code will only do work when there's work to do - it'll also never sleep unnecessarily when it's already known that there's work arriving on the channel etc.
Of course, you don't really want to be using asyncCheck / asyncSpawn in a production system due to the shoddy exception handling (unless you make sure to catch all exceptions in each task), but that's a different story.
For anyone reading the above snippets and wondering how come the async tasks get processed at all, waitFor someFuture will complete all ongoing async work (including "detached" work like that of asyncSpawn but return only when the particular given future is finished.
It is unusual in async applications to use asyncSpawn and indeed poll directly, except in toy examples and scripts where error handling isn't done / crashing back to the shell is fine.
Typical chronos applications have a single main(): Future[void] and are run with waitFor main() just like a "normal" application has a single non-async main entry point (hidden in the case of Nim, but still ..).
Anyway, this was all a bit of a detour from the original question in the thread. No idea how to solve this problem with asyncdispatch, if at all possible.
I'm currently approaching this with the mentality that not all message-processing is async, (only the messages that trigger IO are essentially processed async) and thus there are some messages that produce async work and some that don't.
What I fundamentally want to achieve is thus a loop with the following properties:
I think I'lll need a bit to see what that looks like if I reframe it as "every message processing happens asynchronously".
I think this might be leading me down a road that could actually work a lot better than what I currently have.
Say you have a threadServer (that is, a long-living thread that exists besides your main thread that you can send messages to and receive messages from).
You could also have a third thread, a "helper" thread, whose sole task it is to read messages from the channel for the threadServer blockingly and transform them into async work that it puts on the async-work-queue of the threadServer. That way, when no message is in the channel, the "helper" thread blocks on read, spinning it down to a more energy efficient state.
Meanwhile the "threadServer" itself only does one thing as well: Read from the async-work-queue and work through it. That way message processing happens and if that spawns more async events (because async-IO happens or sth) then that's not a problem since that thread only processes async-events.
It took way too long until I fully understood what you wrote. I think the key thing that I read, but didn't properly register in my mind was the fact that waitFor-ing on anything will do async-work on everything.
That was the puzzle piece that I was missing, since I wanted to do some async work occasionally here or there, but not to the point that it might stop me from immediately responding to messages.
I wrote myself a small example on how I'd set it up (basically a compileable version of the example you wrote) and will likely end up translating into my actual project. It sends a batch of messages, signals the other thread to work through that batch and then it waitFors on the signal again, the echo's proving in between the batches that async work is being done even though the waitFor is being done on the signal.
import chronos
import chronos/threadsync
import std/[os, sequtils, atomics]
import threading/channels
var chan = newChan[int](50)
let signal = ThreadSignalPtr.new()[]
var thread = Thread[void]()
var keepRunning: Atomic[bool]
keepRunning.store(true)
proc senderThread() {.thread.} =
for i in 0..5:
for y in i*3..(i+1)*3:
chan.send(y)
echo signal.fireSync()
sleep(2000)
keepRunning.store(false)
echo signal.fireSync()
proc process1Message(x: int) {.async.} =
echo "before ", x
await sleepAsync(1.seconds)
echo "after ", x
proc main() =
thread.createThread(senderThread)
var msg: int
while keepRunning.load():
echo "\nNEW LOOP"
while chan.tryRecv(msg):
try:
asyncSpawn msg.process1Message()
except CatchableError as e:
echo "Failed for message: ", msg
waitFor signal.wait()
joinThread(thread)
echo "FINISHED"
main()