Again struggling with threads. My goal is to do multi threaded updates to sequences in many objects.
The first attempt using threadpool is successful, then I noticed it will disappear sooner or later. Attempts using Weave or malebolgia fail.
The code are just examples of many attempts. (Error: 'updateCounts' can have side effects)
There certainly is a lack of understanding to be filled.
import std/[threadpool]
{.experimental: "parallel".}
type
Count = ref object
n: int
v: int
nseq: seq[int]
func initCount(n, v: int): Count {.inline.} =
new(result)
result.n = n
result.v = v
result.nseq = newSeq[int](n)
func adder(c: Count) =
{.gcsafe.}:
for i in 0..<c.n:
c.nseq[i] = i + c.v
func updateCounts(ncList: seq[Count])=
parallel:
for i in 0..<ncList.len:
spawn ncList[i].adder()
when isMainModule:
let n = 11
var nclist = newSeq[Count](n)
for i in 0..<n:
nclist[i] = initCount(i, i*2)
ncList.updateCounts()
echo ncList.repr
import weave
type
Count = ref object
n: int
v: int
nseq: seq[int]
func initCount(n, v: int): Count {.inline.} =
new(result)
result.n = n
result.v = v
#result.nseq = newSeq[int](n)
func adder(c: Count) =
{.gcsafe.}:
for i in 0..<c.n:
c.nseq[i] = i + c.v
func updateCounts(ncList: seq[Count]) {.noSideEffect.}=
init(Weave)
parallelFor i in 0..<ncList.len:
#for i in 0..<ncList.len:
ncList[i].adder()
#spawn ncList[i].adder()
exit(Weave)
when isMainModule:
let n = 11
var nclist = newSeq[Count](n)
for i in 0..<n:
nclist[i] = initCount(i, i*2)
ncList.updateCounts()
echo ncList.repr
import malebolgia
import malebolgia / ticketlocks
type
Count = ref object
n: int
v: int
nseq: seq[int]
func initCount(n, v: int): Count {.inline.} =
new(result)
result.n = n
result.v = v
result.nseq = newSeq[int](n)
func adder(c: ptr Count, L: ptr TicketLock)=
{.gcsafe.}:
withLock L[]:
for i in 0..<c.n:
c.nseq[i] = i + c.v
proc updateCounts(ncList: seq[Count]) {.noSideEffect.}=
var m = createMaster()
var L = initTicketLock()
m.awaitAll:
for i in 0..<ncList.len:
m.spawn adder(addr ncList[i], addr L)
#Error: expression cannot be isolated: ncList[i]
when isMainModule:
let n = 11
var nclist = newSeq[Count](n)
for i in 0..<n:
nclist[i] = initCount(i, i*2)
ncList.updateCounts()
echo ncList.repr
#Error: expression cannot be isolated: ncList[i]
I believe you get that error because type Count = ref object and it is not possible with to share ref object across threads.
And in general with the code you showed above that is a problem.
When you have a block like this
{.gcsafe.}: > [body]
You are telling the Nim compiler effectively: I know for sure this code is gcsafe even though you compiler think it's not safe, so trust me and don't exit compilation with an error.
If you don't actually know for sure it is gcsafe, then SIGSEGV is likely in your future at runtime.
Because type Count = ref object it is not safe because reference counting is not atomic across threads, so there can be reference counter race condition in multiple threads and it will crash.
Thank you.
The current Malebolgia state:
import malebolgia
type
Count = object
n: int
v: int
nseq: seq[int]
func initCount(n, v: int): Count {.inline.} =
#new(result)
result.n = n
result.v = v
result.nseq = newSeq[int](n)
func adder(c: var Count)=
{.gcsafe.}:
for i in 0..<c.n:
c.nseq[i] = i + c.v
proc updateCounts(ncList: seq[var Count])=
{.noSideEffect.}:
var m = createMaster()
m.awaitAll:
for i in 0..<ncList.len:
m.spawn adder(ncList[i])
#Error: 'toTask'ed function cannot have a 'var' parameter
when isMainModule:
let n = 11
var nclist = newSeq[var Count](n)
for i in 0..<n:
nclist[i] = initCount(i, i*2)
ncList.updateCounts()
echo ncList.repr
and weave:
import weave
type
Count = object
n: int
v: int
nseq: seq[int]
func initCount(n, v: int): Count {.inline.} =
#new(result)
result.n = n
result.v = v
result.nseq = newSeq[int](n)
func adder(c: var Count) =
{.gcsafe.}:
for i in 0..<c.n:
c.nseq[i] = i + c.v
func updateCounts(ncList: ref seq[var Count]) =
{.noSideEffect.}:
init(Weave)
parallelFor i in 0..<ncList[].len:
ncList[i].adder()
exit(Weave)
# Error: 'ncList' is of type <seq[var Count]> which cannot be captured as it would
# violate memory safety, declared here: [...]tread01.nim(72, 19); using '-d:nimNoLentIterators'
# helps in some cases. Consider using a <ref seq[var Count]> which can be captured.
when isMainModule:
let n = 11
var nclist = newSeq[var Count](n)
for i in 0..<n:
nclist[i] = initCount(i, i*2)
ncList.updateCounts()
#Error: type mismatch Expression: updateCounts(nclist)
echo ncList.repr
not finished yet.
compiles but, SIGSEGV: Illegal storage access. (Attempt to read from nil?)
import weave
type
Count = object
n: int
v: int
nseq: seq[int]
func initCount(n, v: int): ref Count {.inline.} =
new(result)
result.n = n
result.v = v
result.nseq = newSeq[int](n)
func adder(c: ref Count) =
{.gcsafe.}:
for i in 0..<c.n:
c.nseq[i] = i + c.v
func updateCounts(ncList: ref seq[ref Count]) =
{.noSideEffect.}:
init(Weave)
parallelFor i in 0..<ncList[].len:
captures:{ncList}
ncList[][i].adder()
exit(Weave)
when isMainModule:
let n = 11
var nclist: ref seq[ref Count]
new(nclist)
#nclist = newSeq[ref Count](n)
for i in 0..<n:
nclist[i] = initCount(i, i*2)
ncList.updateCounts()
echo ncList.repr
Try this:
import malebolgia
type
Count = object
n: int
v: int
nseq: seq[int]
func initCount(n, v: int): Count {.inline.} =
Count(n: n, v: v, nseq: newSeq[int](n))
func adder(c: ptr Count)=
for i in 0..<c.n:
c.nseq[i] = i + c.v
proc updateCounts(ncList: seq[Count])=
var m = createMaster()
m.awaitAll:
for i in 0..<ncList.len:
m.spawn adder(addr ncList[i])
when isMainModule:
let n = 11
var nclist = newSeq[Count](n)
for i in 0..<n:
nclist[i] = initCount(i, i*2)
ncList.updateCounts()
echo ncList.repr
Thank you.
Now the question, how should/could I know, before I start, that I need a ptr, addr and not ref and/or var?
In the error messages there where hints to it.
Here is the fixed Weave version:
import weave
type
Count = object
n: int
v: int
nseq: seq[int]
func initCount(n, v: int): Count {.inline.} =
#new(result)
result.n = n
result.v = v
result.nseq = newSeq[int](n)
proc adder(c: var Count) =
let c2 = c.addr
parallelFor i in 0..<c.n:
captures: {c2}
c2.nseq[i] = i + c2.v
proc updateCounts(ncList: var seq[Count]) =
let pList = cast[ptr UncheckedArray[Count]](ncList[0].addr)
parallelFor i in 0..<ncList.len:
captures: {pList}
pList[i].adder()
syncRoot(Weave) # Ensure that all parallel processing is finished before exiting the function
when isMainModule:
init(Weave)
let n = 11
var nclist = newSeq[var Count](n)
for i in 0..<n:
nclist[i] = initCount(i, i*2)
ncList.updateCounts()
echo ncList.repr
exit(Weave)
Capturing sequences or var parameters is not allowed, hence the boilerplate to translate them to pointers. That's because you can start a parallel loop and if you do not use syncRoot(Weave) or syncScope (only in the master branch), your function may return, free data while the parallel processing might still use it.
proc updateCounts(ncList: var seq[Count]) =
let pList = cast[ptr UncheckedArray[Count]](ncList[0].addr)
parallelFor i in 0..<ncList.len:
captures: {pList}
pList[i].adder()
syncRoot(Weave) # Ensure that all parallel processing is finished before exiting the function
proc updateCounts(ncList: var seq[Count]) =
let pList = cast[ptr UncheckedArray[Count]](ncList[0].addr)
syncScope: # Scope barrier: Ensure that all parallel processing spoawned in this section and subfunctions are finished, before continuing
parallelFor i in 0..<ncList.len:
captures: {pList}
pList[i].adder()
I'm not sure at the moment how to do escape analysis to avoid that. Possibly with --experimental:views and openArray as values, that might help but it's tricky to work with those.
Thank you.
Following @Araq 's lead I did the same to weave and came up with the code below. It works with both spawn and parallelFor. But it means doing away with the var.
Is the syncRoot then still needed?
import weave
type
Count = object
n: int
v: int
nseq: seq[int]
func initCount(n, v: int): Count {.inline.} =
Count(n: n, v: v, nseq: newSeq[int](n))
func adder(c: ptr Count) =
for i in 0..<c.n:
c.nseq[i] = i + c.v
proc updateCounts(ncList: seq[Count]) =
init(Weave)
for i in 0..<ncList.len:
spawn adder(addr ncList[i])
exit(Weave)
when isMainModule:
let n = 11
var nclist = newSeq[Count](n)
for i in 0..<n:
nclist[i] = initCount(i, i*2)
ncList.updateCounts()
echo ncList.repr
syncRoot is not needed here because the first thing exit(Weave) does is syncRoot
https://github.com/mratsim/weave/blob/7682784/weave/runtime.nim#L178-L184
However, usually threadpools are started and exited at the very start and very end of the program because creating threads is very expensive so most of the time you don't init/exit just around a for loop.
Also a parallelFor or nested parallelFor will be more efficient in scheduling/load-balancing than spawning in a loop. You might create millions of tasks while you only have 8-32 threads which is a lot of extra unneeded overhead, and especially cache inefficient while using parallelFor it will create just what's needed.