I've spent a few days on this and boiled this down to a minimal example. Can an expert with Nim parallelism please tell me where I'm going wrong here?
Output changes from run to run (is stochastic) but I did not intend that and do not see how this can be. Here are several runs in a row of a very simple thread pool algorithm that adds 1 to each element of a seq every time a job is submitted to the pool.
1040.0 == 1000.0 (false)
1020.0 == 1000.0 (false)
1039.0 == 1000.0 (false)
1000.0 == 1000.0 (true) <-- the intended result
1069.0 == 1000.0 (false)
1040.0 == 1000.0 (false)
995.0 == 1000.0 (false)
Minimal Example
import locks
from math import sum
from strformat import `&`
from std/decls import byaddr
const NTHREADS = 8 # pool size
const NSUBMISSIONS = 100 # number of jobs to submit to pool
const NVALUES = 10 # length of reduction data seq
type
ThreadComm = enum Begin Ready Quit
ThreadParams = tuple
threadDataPtr: ptr ThreadData
resultsPtr: ptr Reduction
ThreadData = object
input, output: Channel[ThreadComm]
handle: Thread[ThreadParams]
Reduction = object
values: seq[float]
lock: Lock
proc singleThread(args: ThreadParams) =
let threadData {.byaddr.} = args.threadDataPtr[]
let results {.byaddr.} = args.resultsPtr[]
threadData.output.send(ThreadComm.Ready)
while true:
let command = threadData.input.recv() # 'Begin' or 'Quit' received
if command == ThreadComm.Quit: break
acquire results.lock
for i in 0..<results.values.len:
results.values[i] += 1.0
release results.lock
threadData.output.send(ThreadComm.Ready)
proc main =
var pool = newSeq[ThreadData](len=NTHREADS)
var reduction = Reduction(values: newSeq[float](len=NVALUES)) # target for reduce operation
for thread in pool.mitems:
open thread.input
open thread.output
thread.handle.createThread( singleThread, param=(addr thread, addr reduction) )
# work submission loop
var workID: Natural
while workID < NSUBMISSIONS:
for thread in pool.mitems:
let channel = thread.output.tryRecv() # no data if thread not done yet
if channel.dataAvailable: # assume state == ThreadComm.Ready if anything received
inc workID
thread.input.send(ThreadComm.Begin)
# wrap up
for thread in pool.mitems:
thread.input.send(ThreadComm.Quit)
for thread in pool.mitems:
joinThread thread.handle
close thread.input
close thread.output
# report
const expected = 1000.0
let total = sum(reduction.values)
let TorF = total == expected
echo &"{total} == {expected} ({TorF})"
when isMainModule:
main()
The goal is to use this minimal thread pool implementation, not a library. Yes, I'm aware of the libraries and have used them all. I'd love to understand why this doesn't work as I intend because it's rather basic. Please help me move along to the facepalm moment :)
There are a couple of logic flaws in your job pool example which are causing your issues. Nothing particularly Nim specific, but here's a breakdown.
while workID < NSUBMISSIONS: # This will run ceil(NSUBMISSIONS / NTHREADS) * NTHREADS items
for thread in pool.mitems:
let channel = thread.output.tryRecv() # Need to retry this until done for each loop or block
if channel.dataAvailable: # assume state == ThreadComm.Ready if anything received
inc workID
thread.input.send(ThreadComm.Begin)
echo "workId: ", workId # added this to print actual number of jobs run
The first is how you're divvying up the number of jobs. You have a thread pool size of 8, which doesn't fit evenly into your number of submissions of 100. The logic in your loop will try and submit ceil(100 / 8).int * 8 ≈ 104 jobs.
The second logic error introduces the stochastic failure. The thread.output.tryRecv() can run before individual threads are finished. You would need to keep track of the finished threads, and keep retrying the unfinished threads. Alternatively you can use thread.output.recv() instead which will block until each thread is done per cycle. Modifying your code to use recv it then outputs 1040 every time with 104 work units done.
In other words, thread pools are tricky and you'd need to rethink your logic for how you submit work to threads and wait for them for finish. Usually you'll have one channel (queue) shared between multiple workers which will use system primitives or other methods to let only one thread grab a work chunk at a time.
The logic in your loop will try and submit...
Ah good catch with the while workID < NSUBMISSIONS submission loop, Thank you! I tried too hard to make a tidy minimal example and introduced this bug thinking I could get rid of my break condition for cleanliness. Edited the code and output above to reflect this fix.
The thread.output.tryRecv() can run before individual threads are finished. You would need to keep track of the finished threads, and keep retrying the unfinished threads.
So my understanding is that the whole point of tryRecv is exactly that it can run before threads are finished, and it will tell you in the dataAvailable field if a message was received or not. So what I do is poll the thread pool until I find a thread that has indicated it is ready for more work. In this way I am tracking what threads are finished and querying them the next time I ask around the entire thread pool. I'm apparently blind to my own code by now so if this is not what I've written in code, then please tell me.
Other understandings I have of tryRecv that might be wrong: tryRecv empties the message in the channel, so it cannot receive the same message twice. Also, channels are atomic, so a thread writing a message and another thread receiving a message on the same channel cannot have a race condition. tryRecv result field of .dataAvailable will be true when a message was successfully read and emptied from the channel.
SOLVED
So, after much debugging... Always use initLock on your locks. Don't ever forget it otherwise the lock will "work" without actually doing what it's supposed to. You might also need deinitLock then.
So, after much debugging... Always use initLock on your locks. Don't ever forget it otherwise the lock will "work" without actually doing what it's supposed to. You might also need deinitLock then.
Oops, sorry I forgot that I'd changed that too. Yes, definitely init them (and conds as well) as uninitialized locks may have strange undefined behaviors.
In this way I am not needing to track state, because tryRecv does the tracking of finished or not finished for me, mostly because the last thing a thread does is send 'Ready' into the channel when it's done with its work. I'm apparently blind to my own code by now so if this is not what I've written in code, then please tell me.
You're sort of correct here. Though you'd likely need to increment workID count when submitting a task and not after checking tryRecv. There's many other ways, but currently you're increment and checking workID after receiving a result, which won't be deterministic.
you'd likely need to increment workID count when submitting a task and not after checking tryRecv
For anyone else reading this, we're talking past each other here I suspect, because the code does do the right thing once an initLock is added. What happens for submission is it only submits jobs to threads that are ready for jobs - thus the check for receiving a "Ready" message through tryRecv and why workID is only incremented upon a job submission after receiving a response from the thread, which is the "ready for work" message. With initLock, results at the end are deterministic and all is good. Yay!