hi, I have a code sample on github gist: https://gist.github.com/sepisoad/f61e4127f6ca5d0b14da
there I have a function called busyLoop that checks for a certain condition in a loop, when met the loop is over. generally this method is no nice and can be replaced with an elegant event loop, but i'm wondering if an event loop is possible in nim, if so, how can we do it?
Hi, I've downloaded your code, run it, read channels.nim in Nim's source, searched for ways how we can listen to several pthread conditions, etc. I think I have a proposal.
In your code if you replace tryRecv() to recv(), then it will wait for both threads without sleeping. However, if you want to react on the first message (first in time), then it is not possible currently. I guess that this is what you want. For comparison, in Go you can listen on several channels and timers, and you get notified about the first event (first in time). The current implementation of channels.nim uses pthread (I can only speak about linux, not about windows), and the receiver thread gets notified by pthread_cond_wait(...) function. According to a quick search on google, it is not possible to listen to several pthread conditions at once from one thread. Theoretically we could use one condition for all channels, but sharing the mutex of this condition for all channel transfers might be bad idea: if large data is written to one channel, then we shouldn't stop all the other channels for that time.
My proposal is to introduce one additional condition and its mutex. This new mutex would not be used by any channels for protecting their data, but a signal would be sent to this new condition after each channel transfer. This way you could wait until the next event (which can come from any channel in your program, not only from those, which are currently under watch), then you could use tryRecv() to check whether the signal came from the channels being watched. You might just miss a signal from this condition while in the loop of checking the tryRecv() functions, but this seems solvable with an extra counter variable.
What do you think?
Peter
First, it is not recommend to mix spawn with channels. spawn uses a thread pool and if there aren't enough threads in the pool, waiting for a channel may deadlock under certain conditions. To work with spawn, use flow variables, and use createThread in combination with channels.
Second, it is not clear what condition you are waiting for. If you are waiting for both channels to have a value available, simply wait on them in (any) order. Retrieve one value from one, then one from the other, then print.
If you actually want to wait for one or the other (whichever comes first), there are a number of solutions. The easiest one is to send both to the same channel (though you may need a channel parameterized by a more complex type, e.g., by (int, string) or (someEnum, string) rather than just string in order to encode the source of the message).
Did you wanted something like this? I've modified busyloop() to use createThread and also the logic was broken (if busyloop() was reading data from only one of the channels once, then it would hang forever).
So the code is without sleep in busyloop():
import os
import threadpool
type
stringChannel = TChannel[string]
var
chanTaskA: stringChannel
chanTaskB: stringChannel
proc heavyTaskA() {.thread.} =
for i in 1..10:
stdout.write("@\n")
sleep(1000)
chanTaskA.send("TASK_A_FIN")
proc heavyTaskB() {.thread.} =
for i in 1..10:
stdout.write("#\n")
sleep(1100)
chanTaskB.send("TASK_B_FIN")
proc busyLoop() =
var threads = newSeq[TThread[void]](2)
createThread[void](threads[0], heavyTaskA)
createThread[void](threads[1], heavyTaskB)
var taskAOK, taskBOK: bool
var taskAMSG, taskBMSG: string
var counter: int
while true:
waitForNextMessage(counter)
stdout.write(".\n")
if not taskAOK:
var ret = chanTaskA.tryRecv()
taskAOK = ret[0]
taskAMSG = ret[1]
if not taskBOK:
var ret = chanTaskB.tryRecv()
taskBOK = ret[0]
taskBMSG = ret[1]
if taskAOK and taskBOK:
echo(taskAMSG)
echo(taskBMSG)
break
proc main() =
chanTaskA.open()
chanTaskB.open()
busyLoop()
chanTaskA.close()
chanTaskB.close()
main()
What do you think? It could be fancier with a marcro like for the Go channels. The call waitForNextMessage(counter) will hang until there is a message sent to any channel since the last waitForNextMessage(counter) call. I had to add these lines to lib/system/channels.nim:
type
TMessageCounter {.pure, final.} = object
lock: TSysLock
cond: TSysCond
counter: int
var MessageCounter = TMessageCounter()
initSysLock(MessageCounter.lock)
initSysCond(MessageCounter.cond)
proc waitForNextMessage*(counter: var int) =
acquireSys(MessageCounter.lock)
if MessageCounter.counter == counter:
waitSysCond(MessageCounter.cond, MessageCounter.lock)
counter = MessageCounter.counter
releaseSys(MessageCounter.lock)
And replace send* in the same file to:
proc send*[TMsg](c: var TChannel[TMsg], msg: TMsg) =
## sends a message to a thread. `msg` is deeply copied.
var q = cast[PRawChannel](addr(c))
sendImpl(q)
acquireSys(MessageCounter.lock)
inc MessageCounter.counter
broadcastSysCond(MessageCounter.cond)
releaseSys(MessageCounter.lock)
And that's it. I've almost forgot about some extra lines in lib/system/syslocks.nim (just in the linux part, I can't finish the windows part):
proc broadcastSysCond(cond: var TSysCond) {.
importc: "pthread_cond_broadcast", header: "<pthread.h>", noSideEffect.}
The proposed solution above is giving an overhead to every channel sending call. We could use a symbol with -d to switch this functionality on. In this case if there is a call to waitForNextMessage() without -d someSymbol given at command line, then it would generate a compile error with an error message explaining that defining that symbol is needed.
What do you think? Could a solution like this part of the channel code? It would be fast if this feature is not used (Nim can be a system language).
Peter
@mora Please create a proper pull request for this solution so we can discuss it. It looks convoluted at first sight but I'm sure I am missing lots of details. For example, threadpool.awaitAny uses this implementation:
type
AwaitInfo = object
cv: Semaphore
idx: int
FlowVarBase* = ref FlowVarBaseObj ## untyped base class for 'FlowVar[T]'
FlowVarBaseObj = object of RootObj
# ...
# for 'awaitAny' support
ai: ptr AwaitInfo
idx: int
proc awaitAny*(flowVars: openArray[FlowVarBase]): int =
## awaits any of the given flowVars. Returns the index of one flowVar for
## which a value arrived. A flowVar only supports one call to 'awaitAny' at
## the same time. That means if you await([a,b]) and await([b,c]) the second
## call will only await 'c'. If there is no flowVar left to be able to wait
## on, -1 is returned.
## **Note**: This results in non-deterministic behaviour and so should be
## avoided.
var ai: AwaitInfo
ai.cv = createSemaphore()
var conflicts = 0
for i in 0 .. flowVars.high:
if cas(addr flowVars[i].ai, nil, addr ai):
flowVars[i].idx = i
else:
inc conflicts
if conflicts < flowVars.len:
await(ai.cv)
result = ai.idx
for i in 0 .. flowVars.high:
discard cas(addr flowVars[i].ai, addr ai, nil)
else:
result = -1
destroySemaphore(ai.cv)
proc nimFlowVarSignal(fv: FlowVarBase) =
if fv.ai != nil:
acquire(fv.ai.cv.L)
fv.ai.idx = fv.idx
inc fv.ai.cv.counter
release(fv.ai.cv.L)
signal(fv.ai.cv.c)
...
This is a simple 'if' if this feature is not used and would use 2 additional words for every channel object and doesn't require yet another define.
@Jehan
First, it is not recommend to mix spawn with channels. spawn uses a thread pool and if there aren't enough threads in the pool, waiting for a channel may deadlock under certain conditions.
I intend to fix this by using awaitOrTimeout(gSomeReady) instead of await(gSomeReady) in the implementation of nimSpawn.
<rant> awaitOrTimeout is like 3 lines of code for Windows and lots of complex code on Posix ... </rant>