I'd appreciate any guidance here. My use case is having thousands of tasks, using the multiprocessor like a worker pool to churn through jobs, but each running task has a huge amount of memory and context data structures that take up a lot of space, and are slow to allocate. So for this reason I keep ncores of them allocated and reuse them like a pool.
The problem is that Weave seems very sensitive to the number of max worker threads defined. Often, it seems like 1 thread never actually runs, so my program runs forever waiting for the thread to complete.
Here's a small demo showing the kind of parallel coordination algorithm I think I need. I can only see successful completion on my 8 core system with WEAVE_NUM_THREADS=2 or 4.
Also, it seems weave requires -d:useMalloc?
import std/envvars
import weave
from os import sleep
import strformat, strutils
let ncores = getEnv("WEAVE_NUM_THREADS").parseInt
# Where is the setWeaveNumThreads(...) API?? putEnv before import weave?
var total = 0
var threads: seq[Flowvar[int]]
var work = @[0,1,2,3]
echo "ncores: ",ncores
proc dowork(id: int): int =
debugEcho &"running thread {id}"
sleep 500
return id
init(Weave)
block threadLoop:
while true:
let thereIsWork = work.len.bool
let thereIsVacancy = threads.len < ncores-1
while thereIsWork and thereIsVacancy:
echo &"spawning thread {work[0]}"
threads.add spawn dowork(work[0])
work.del 0
if threads.len == 0:
break threadLoop
for i in countdown(threads.len-1, 0):
echo &"checking completion of {threads.len} threads"
if isReady(threads[i]):
let id = sync(threads[i])
echo &"finished thread {id}"
total += 1
threads.del i
sleep 200
exit(Weave)
echo "FINISHED"
echo &"total: {total}"
In the mean time I will try to get a parallelForStaged version working, even though I think this version should work.
# Where is the setWeaveNumThreads(...) API?? putEnv before import weave?
There is none.
My use case is having thousands of tasks, using the multiprocessor like a worker pool to churn through jobs, but each running task has a huge amount of memory and context data structures that take up a lot of space, and are slow to allocate. So for this reason I keep ncores of them allocated and reuse them like a pool.
The issue is that you're redoing the event loop to dispatch tasks. However, Weave scheduling requires cooperation between threads, it's a variant of work-stealing called "work-requesting", and your main thread that does the while true does not cooperate hence tasks sent to that thread get stuck.
You can solve this by adding explicit Weave.loadBalance() calls like so:
init(Weave)
block threadLoop:
while true:
let thereIsWork = work.len.bool
let thereIsVacancy = threads.len < ncores-1
while thereIsWork and thereIsVacancy:
echo &"spawning thread {work[0]}"
threads.add spawn dowork(work[0])
work.del 0
Weave.loadBalance() # for example here <--------
if threads.len == 0:
break threadLoop
for i in countdown(threads.len-1, 0):
echo &"checking completion of {threads.len} threads"
if isReady(threads[i]):
let id = sync(threads[i])
echo &"finished thread {id}"
total += 1
threads.del i
sleep 200
exit(Weave)
Also, it seems weave requires -d:useMalloc?
I haven't tested since arc/orc yet but IIRC the "type rejected" checks are about destructors not allocators. What errors do you get?
In the mean time I will try to get a parallelForStaged version working, even though I think this version should work.
That would be better as you would directly use Weave event loop.
What kind of algorithm are you implementing?
What errors do you get?
For -d:useMalloc I will now go back and try your suggestions before jumping to conclusions. I was not seeing errors, but it appeared that WEAVE_NUM_THREADS was not being observed at all when -d:useMalloc was missing.
What kind of algorithm are you implementing?
Simple, with a sum reduce at the end, so just parallelFor I think will work. I have a pool of data structures that I don't want to re-allocate between tasks and I don't want to allocate more than ncores -- or however many threads can run in parallel -- amount of memory because they're quite large. I am now using locks and pool borrowing/returning at the start and end of each task. I have thousands of tasks that must be processed in a pleasantly parallel fashion, with a simple sum-reduce operation at the end of each task that generates a seq of N numbers and I want the row-wise or element-wise sum over all task results, making a final seq of N numbers.
My C++ version of this code was using a threadpool library and only launching new threads when there was capacity in the coordinator, so that was the algorithm for my initial port to Nim before investigating parallelFor. I'm curious to dig more into Weave's parallelReduce to see if that can clean up my code even more.
There is none.
I tried Malebolgia at first, and had to modify its source so I could specify the max worker threads at runtime, so the user can specify --threads 12 for example. However, my Nim version was so very slow (even d:release danger, flto, on linux, etc.) After thoroughly profiling my code, and substituting various optimizations in lieu of the initial much clearer code, I decided that something about Malebolgia was probably causing a lot of slowness all by itself. Again, maybe I was using it incorrectly. So, instead of doing a deep dive into Malebolgia since the library is still in early stages, I decided to give Weave a try. I hoped that it might resolve the huge slowdown I was seeing in my Nim version compared to the C++ version, and that maybe I would not have to alter the source code of Weave just to support runtime max worker thread number via a CLI argument.
I decided that something about Malebolgia was probably causing a lot of slowness all by itself.
Hey hey hey.... You could reported a bug, you know.
You can try weave-io just in case. It supports runtime num_threads.
Here is how to do a reduction: https://github.com/mratsim/weave-io/blob/master/examples/e04_parallel_reduce.nim#L23
You can do per-task memory alloc/dealloc in the prologue and epilogue. There won't be more in-flights that the number of threads you have set up.
Now the backend is optimized for compute/throughput, and in the future it will be optimized for IO/latency, but the API will stay the same, and is the same as Weave with an extra tp/Threadpool parameter.
Also there is no need for regular loadBlance as it's work-stealing-based instead of work-requesting. I.e. workers do not need cooperation of others to distribute work.
@mratsim this is just what I was looking for. Fantastic work and thank you for sharing your library. At one point I thought I had run into a bug in your library, but found it was more that I was using excessive locks, and everything resolved when I used the prologue/loop/epilogue paradigm correctly, which also reduced the apparent code complexity.
I call a worker function as the main point of the parallelFor loop, and at the start a resource acquisition with locks, and find that I need to wrap them in {.gcsafe.}: and try/catch but it does work now.
This is fantastic that with relatively clean code I can get within 1.7x the C++ speed. Next I will work on tuning the code to get closer to c++ speed. I suspect the heavy use of Tables as a port of the C++ unordered_map may be a big part of this speed difference.
To put a bow on this thread for now. C++ speed parity achieved!
I've rewritten my code without any threadpool library, relying only on creating 1 thread per core requested, and having those threads block/wait using Channels of a simple enum type [Ready, Begin, Quit]. The main host thread then looks for finished threads in another channel tryRecv, fills their data they already have a pointer to, and tells them in their channel to go ahead and process. The main thread never sleep and instead busy waits to submit more work. Despite having thousands of work items to get through, this now lets me get the runtime imperceptibly close to the C++ threadpool version. I'm happy enough with this and can even reduce my dependencies as a very minor bonus. However, this has me rethinking that I know anything about threaded programming...