Dear community, I am playing creating a multithreaded program where I parse documents in parallel. I would like to have a shared "list" or sequence, where I can put strings from multiple threads by using a simple lock. However I can't find how to make this happen without compiler complaining about thread safety.
I was searching for standart thread safe Queue, but I am confused what to use.
High level non working example I am talking about:
var
thr: array[0..3, Thread[seq[string]]] # array of 4 threads
sharedSomething: seq[string] # all urls loaded from stdin
L: Lock
proc threadFunc(partialUrls: seq[string]) {.thread.} =
acquire(L) # lock stdout
# Puss to a sharedSomething while it's locked
release(L)
when isMainModule:
initLock(L)
# the rest of threads
for i in 0..high(thr):
createThread(thr[i], threadFunc, ...)
joinThreads(thr)
deinitLock(L)
shared "list" or sequence
When you need only such a simple container to collect the result, why not just use one for each thread and finally join the content. Should be faster as threads do not block each other. For more complicated containers -- I think Nim has shared tables with locks.
@lqdev's suggestion of channels would probably be the best for your use case. They're essentially a queue. Just be careful to only pass simple data that get's cloned or to not keep references to the data you send via a channel (if you're using ARC/ORC). There's isolate that'll check the data in the future, but channels still work now with a bit of care.
@dawkot, I tried sharedlist but you only get iterAndMutate[A] which seemed to not fit using it as a queue. Though you could add to it on the workers and only do a single iterAndMutate at the end.
If you have a producers-consumers -pipeline, Channels is the way to go. If you need random access, an example solution is below.
There are two important things to notice:
doAssert(compileOption("gc", "arc") or compileOption("gc", "orc"))
import locks, random
var
thr: array[0..3, Thread[void]] # array of 4 threads
l: Lock
sharedSomething {.guard: l.}: seq[string] # all urls loaded from stdin
a: int
proc threadProc() {.thread.} =
while true:
let r = rand(100)
for i in 0 .. r:
withLock(l):
{.gcsafe.}:
if i mod 3 == 0 and sharedSomething.len() > 0:
sharedSomething.del(0)
else:
sharedSomething.add($a)
inc(a)
if sharedSomething.len() mod 10000 == 0:
echo "len: ", sharedSomething.len(), ", head: ", sharedSomething[sharedSomething.high]
when isMainModule:
randomize()
initLock(l)
for i in 0 .. high(thr):
createThread(thr[i], threadProc)
joinThreads(thr)
deinitLock(l)
Thank you all for great advice! I started to play with the channels this morning and it works well. Only logic I have yet to think about is how to detect all threads ended in the channel receive end.
@Allin thanks for nice example, I actually started with a similar logic but never made the code work at all, so this example is really helpful