I have a relatively slow task that creates an array that needs to be returned to the main thread. I've created the example below and parallelized in a horrible way as a starting point. In go , I would spin up $n goroutines that were reading off the same channel and send the result to a waiting channel and that used a sync.WaitGroup.Done call to indicate when all was finished.
What would be the nim ish(?) way to parallelize the code below. Thanks.
{.experimental.}
import threadpool
import os
# this can be as large as 250 million.
const size = 59373566
## each worker will need to fill an array and return it to
## the main thread which will accumulate the results of all workers.
type MyData {.shallow.} = ref object
arr: seq[int8]
name: string
i: int
proc slow_worker(o:int): MyData =
## example function that returns an object similar to my real use-case.
result = MyData(arr: new_seq[int8](size), name: "v" & $o, i:o)
# do enough work that it is slow in release mode.
for j in 0..(7 + o mod 2):
for i in 0..result.arr.high:
result.arr[i] = ((result.arr[i].int + i + o) mod int8.high.int).int8
proc main() =
const threads = 4
# task is to parallelize calls to `slow_worker`
var names = newSeq[string](250)
for i in 0..names.high:
names[i] = "part" & $i
var accumulated = newSeq[int](size)
var i = 0
while i < names.len:
var responses = newSeq[FlowVar[MyData]](min(threads, names.len - i))
# limited to block of size `threads`. how to fill next empty thread/process
# with work as soon as one is finished.
parallel:
for ti in 0..min(threads, responses.len-1):
responses[ti] = spawn slow_worker(i + ti)
for r in responses:
var d = ^r
echo d.name, d.i
for i, v in d.arr:
accumulated[i] += v.int
i += responses.len
when isMainModule:
main()
Here's what I came up with. Seems to work pretty well in keeping the cores busy.
{.experimental.}
import threadpool
import os
# this can be as large as 250 million.
const size = 59373566
## each worker will need to fill an array and return it to
## the main thread which will accumulate the results of all workers.
type MyData {.shallow.} = ref object
arr: seq[int8]
name: string
i: int
proc slow_worker(o:int, dptr:ptr MyData): bool =
## example function that returns an object similar to my real use-case.
var d:MyData = dptr[]
if d == nil:
d = MyData()
if d.arr == nil or d.arr.len != size:
d.arr = newSeq[int8](size)
else:
zeroMem(d.arr[0].addr, sizeof(d.arr[0]) * d.arr.len)
d.name = "v" & $o
d.i = o
var outer = 7
when defined(debug):
outer = 1
# make heterogeneous workflow
if o mod 10 == 0:
outer += 4
when defined(release):
outer += 20
for j in 0..(outer + o mod 2):
for i in 0..d.arr.high:
d.arr[i] = ((d.arr[i].int + i + o) mod int8.high.int).int8
dptr[] = d
result = true
proc main() =
const threads = 4
# task is to parallelize calls to `slow_worker`
var results = newSeq[MyData](threads)
var responses = newSeq[FlowVarBase](threads)
var accumulated = newSeq[int](size)
var jobi = 0
for ti in 0..min(threads, results.len-1):
responses[ti] = spawn slow_worker(ti, results[ti].addr)
jobi += responses.len
while len(responses) != 0:
var index = awaitAny(responses)
if index == -1: break
echo "received:", results[index].name
for i, v in results[index].arr:
accumulated[i] += v.int
if jobi < 80:
#echo "sending:", jobi
responses[index] = spawn slow_worker(jobi, results[index].addr)
else:
responses.del(index)
results.del(index)
jobi += 1
when isMainModule:
main()
I would caution you to be careful using a ptr to refer to a ref object. This is allowed because ptr is a form of unsafe code, but it is in fact illegal to share ``ref`` types across threads. If the GC were to run while slow_worker was running, you would have a dangling pointer. You can use a Channel instead to have a safer interface, but this might be difficult, considering that your type has a seq in it.
Perhaps you could refactor your code to use a fixed sized buffer. Then the unsafe shared ref can be removed.
Hmm... That's question I don't feel 100% about. Interestingly, this can be done for the FFI. You can use GC_ref() and GC_unref() to manually update reference counts to preserve an object for a C library. I've never seen this done within Nim, though.
If you are comfortable with manual memory management, you could make a tiny DynamicArray or Vector that uses explicit allocation and free instead of GC. (You use create instead of new, and use an unsafe ptr instead of the traced ref). You are also responsible for freeing with free.
This article <https://nim-lang.org/docs/gc.html>_. may be helpful. The idea of manual reference counting is presented here <https://nim-lang.org/docs/backends.html#memory-management-strings-and-c-strings>_.
Is the seq shared across threads? If so you maybe need guard and locks to avoid race-condition.
If the seq is combined result of separate calculation, it's pretty much safe to spawn the computation and read the values from resulted FlowVars. You can then check whether all the computation is done with isReady and together with all proc from sequtils module.
I got some help from @Araq in gitter. The simple change that has now worked for hundreds of long-running executions of my real code is to allocate all of what my example calls MyData, including the name and the seq in the main process. The workers can set values in the allocated seq (and use zeroMem).
@mashingan, there is no sharing across threads. And the main thread only accesses the data when the "worker" threads have completed.
My real use of this is implemented here: https://github.com/brentp/mosfun/blob/master/src/mosfun.nim with sample_counter being the function that is called in parallel.
Now, I'm limited by this issue: https://github.com/nim-lang/Nim/issues/7894 that caps the number of threads I can use for larger chromosomes, but I can use 22 threads for the largest which is enough for now.