I am trying to parallelize kernel matrix calculations using threadpool:
proc calculateKernelMatrix*(K: AbstractKernel, data: Matrix[F]): Matrix[F] =
let n = int64(ncol(data));
var mat = Matrix[F](data: newSeq[F](n*n), dim: @[n, n]);
for j in 0..<n:
for i in j..<n:
var tmp: F;
spawn kernel(K, data.col(i), data.col(j), tmp.addr);
mat[i, j] = tmp;
mat[j, i] = tmp;
sync();
return mat;
I'd like each j to run in a new thread rather than each (i, j).
I have noticed that multiple CPU's were not being used, each process is spawned and seems to wait till it's finished before the next one starts so that only on CPU is being used at once. Is there something I should check about my implementation to make sure that I'm not doing something obviously wrong? Matrix[F] is a ref object and is only read from so it should be fine.
proc subFor(j: int64, n: int64, K: AbstractKernel, data: Matrix[F], mat: Matrix[F]): void {.thread.} =
for i in j..<n:
var tmp: F = kernel(K, data.col(i), data.col(j));
mat[i, j] = tmp;
mat[j, i] = tmp;
return;
proc calculateKernelMatrix*(K: AbstractKernel, data: Matrix[F]): Matrix[F] =
let n = int64(ncol(data));
var mat = Matrix[F](data: newSeq[F](n*n), dim: @[n, n]);
for j in 0..<n:
spawn subFor(j, n, K, data, mat);
sync();
return mat;
Nested parallel for is non-trivial, even OpenMP doesn't get it right.
Nim threadpool doesn't have the tools to deal with it.
You can use Weave for this, see this example matrix transposition from the README: https://github.com/mratsim/weave#data-parallelism
import weave
func initialize(buffer: ptr UncheckedArray[float32], len: int) =
for i in 0 ..< len:
buffer[i] = i.float32
proc transpose(M, N: int, bufIn, bufOut: ptr UncheckedArray[float32]) =
## Transpose a MxN matrix into a NxM matrix with nested for loops
parallelFor j in 0 ..< N:
captures: {M, N, bufIn, bufOut}
parallelFor i in 0 ..< M:
captures: {j, M, N, bufIn, bufOut}
bufOut[j*M+i] = bufIn[i*N+j]
proc main() =
let M = 200
let N = 2000
let input = newSeq[float32](M*N)
# We can't work with seq directly as it's managed by GC, take a ptr to the buffer.
let bufIn = cast[ptr UncheckedArray[float32]](input[0].unsafeAddr)
bufIn.initialize(M*N)
var output = newSeq[float32](N*M)
let bufOut = cast[ptr UncheckedArray[float32]](output[0].addr)
init(Weave)
transpose(M, N, bufIn, bufOut)
exit(Weave)
main()
Or for strided loops:
import weave
init(Weave)
# expandMacros:
parallelForStrided i in 0 ..< 100, stride = 30:
parallelForStrided j in 0 ..< 200, stride = 60:
captures: {i}
log("Matrix[%d, %d] (thread %d)\n", i, j, myID())
exit(Weave)