Let's say we have a loop performing something over a sequence of items (with each operation not related to the others), is there any way to have 2 or more of the processed at the same time (using different threads or cores)?
Is there any related example?
cpuTime or monotonic clocks are tricky to use for multithreading because it counts the number of cycle spent by the CPU, but if you have a parallel workload that takes 1000 cycles per cores on 10 cores, you might get a report of 10000 cycles (divided by CpuFreq). And with multithreading overhead and such you might report a total of 12000 cycles instead of 10000 cycles for a serial processing on a single core and then it seems slower.
Now, for your use-case I would use either OpenMP or cook up a parallel-for using raw spawn. Here you go, comments are inline, make sure to create a bench that the compiler cannot optimize away.
import cpuinfo, times, strformat, math, threadpool
# Bench script
# ----------------------------------------------------------------------------------------------------
template benchmark(benchName: string, body: untyped) =
# When multithreading, make sure to measure wall-clock time
# If you use CPU time you might measure the CPU time on each processor
# and then add them together.
let start = epochTime()
body
let stop = epochTime()
let elapsed = stop-start
# strformat is again broken in templates :/
# echo &"Wall time for {benchName:<20}: {elapsed:3.4f}s"
echo "Wall time for ", benchName, ": ", round(elapsed, 3), " s"
# OpenMP
# ----------------------------------------------------------------------------------------------------
# Add OpenMP to compilation flags
{.passC:"-fopenmp".}
{.passL:"-fopenmp".}
# Nim native threading
# ----------------------------------------------------------------------------------------------------
template parallelChunks(start, stop: int, chunkOffset, chunkSize: untyped{ident}, body: untyped): untyped =
## In-place declare and define "chunkOffset" and "chunkSize"
## That corresponds to a slice of the start..stop range
## that will be processed on the same core
let
numIters = (stop - start)
numChunks = countProcessors()
baseChunkSize = numIters div numChunks
remainder = numIters mod numChunks
# The following simple chunking scheme can lead to severe load imbalance
#
# `chunkOffset`{.inject.} = chunkSize * threadId
# `chunkSize`{.inject.} = if threadId < nb_chunks - 1: chunkSize
# else: numIters - chunkOffset # remainder if division isn't exact
#
# For example dividing 40 items on 12 threads will lead to
# a base_chunk_size of 40/12 = 3 so work on the first 11 threads
# will be 3 * 11 = 33, and the remainder 7 on the last thread.
# Instead of dividing 40 work items on 12 cores into:
# 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 7 = 3*11 + 7 = 40
# the following scheme will divide into
# 4, 4, 4, 4, 3, 3, 3, 3, 3, 3, 3, 3 = 4*4 + 3*8 = 40
var chunkOffset {.inject.}, chunkSize {.inject.}: Natural
for threadID in 0 ..< numChunks:
if threadID < remainder:
chunkOffset = start + (baseChunkSize + 1) * threadID
chunkSize = baseChunkSize + 1
else:
chunkOffset = start + baseChunkSize * threadID + remainder
chunkSize = baseChunkSize
block: body
# Benches
# ----------------------------------------------------------------------------------------------------
let maxLim = 1_000_000
proc doSth(i: int) =
let k = i*2
benchmark "normal loop":
for i in 1..maxLim:
doSth(i)
# Don't interleave stacktraces, they require to allocate strings
# and OpenMP doesn't create a GC or would require "setupForeignThreadGc()"
{.push stackTrace:off.}
benchmark "parallelized OpenMP":
for i in 1||maxLim:
doSth(i)
{.pop.}
proc chunkedDoSth(chunkOffset, chunkSize: Natural) =
## A checunk is processed on the same core
for i in chunkOffset ..< chunkSize:
doSth(i)
benchmark "parallelized Nim spawn":
parallelChunks(1, maxLim, chunkOffset, chunkSize):
# Spawn a task for each chunk
spawn chunkedDoSth(chunkOffset, chunkSize)
First of all, thanks for the very detailed answer! I really appreciate it!
I have been using pretty much the same benchmark code, so times should be rather accurate.
I tried your examples and it seems to be working (btw, on macOS, the -fopenmp seems redundant for clang, as it supposedly has built-in support for openMP -- unless I'm mistaken).
The times I'm getting are pretty much these (for 1 billion repetitions):
Wall time for normal loop: 16.175 s
Wall time for parallelized OpenMP: 16.178 s
Wall time for parallelized Nim spawn: 0.0 s
Note: for less repetitions, the OpenMP-based benchmark seems to be around 20-30% faster.
Now, I have a question regarding the last solution: Is there any way that I can sync all the spawn processed? (I mean... know when all of them have finished)
You're mistaken, there is no OpenMP on OSX, Apple wants to force everyone to use Grand Central Dispatch (which as someone writing a multithreading runtime, must say that it's a fine library but not portable on Windows at the moment).
So if you're benchmarking on OSX you should get GCC or Clang from Homebrew.
To wait for all you can add sync() at the end.
To wait for some, unfortunately you must return a dummy value at the moment due to https://github.com/nim-lang/Nim/issues/8040.
So without further ado
import cpuinfo, times, math, threadpool
# Bench script
# ----------------------------------------------------------------------------------------------------
template benchmark(benchName: string, body: untyped) =
# When multithreading, make sure to measure wall-clock time
# If you use CPU time you might measure the cumulated CPU time on each processor.
let start = epochTime()
body
let stop = epochTime()
let elapsed = stop-start
echo "Wall time for ", benchName, ": ", round(elapsed, 3), " s"
# OpenMP
# ----------------------------------------------------------------------------------------------------
# Add OpenMP to compilation flags
{.passC:"-fopenmp".}
{.passL:"-fopenmp".}
# Nim native threading
# ----------------------------------------------------------------------------------------------------
template parallelChunks(start, stop: int, chunkOffset, chunkSize, threadID: untyped{ident}, body: untyped): untyped =
## In-place declare and define "chunkOffset" and "chunkSize"
## That corresponds to a slice of the start..stop range
## that will be processed on the same core
let
numIters = (stop - start)
numChunks = countProcessors()
baseChunkSize = numIters div numChunks
remainder = numIters mod numChunks
# The following simple chunking scheme can lead to severe load imbalance
#
# `chunkOffset`{.inject.} = chunkSize * threadId
# `chunkSize`{.inject.} = if threadId < nb_chunks - 1: chunkSize
# else: numIters - chunkOffset # remainder if division isn't exact
#
# For example dividing 40 items on 12 threads will lead to
# a base_chunk_size of 40/12 = 3 so work on the first 11 threads
# will be 3 * 11 = 33, and the remainder 7 on the last thread.
# Instead of dividing 40 work items on 12 cores into:
# 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 7 = 3*11 + 7 = 40
# the following scheme will divide into
# 4, 4, 4, 4, 3, 3, 3, 3, 3, 3, 3, 3 = 4*4 + 3*8 = 40
var chunkOffset {.inject.}, chunkSize {.inject.}: Natural
for threadID{.inject.} in 0 ..< numChunks:
if threadID < remainder:
chunkOffset = start + (baseChunkSize + 1) * threadID
chunkSize = baseChunkSize + 1
else:
chunkOffset = start + baseChunkSize * threadID + remainder
chunkSize = baseChunkSize
block: body
# Benches
# ----------------------------------------------------------------------------------------------------
let maxLim = 1_000_000
proc doSth(i: int) =
let k = i*2
benchmark "normal loop":
for i in 1..maxLim:
doSth(i)
# Don't interleave stacktraces, they require to allocate strings
# and OpenMP doesn't create a GC or would require "setupForeignThreadGc()"
{.push stackTrace:off.}
benchmark "parallelized OpenMP":
for i in 1||maxLim:
doSth(i)
{.pop.}
type Dummy = bool
## Allow waiting on void spawns: https://github.com/nim-lang/Nim/issues/8040
proc chunkedDoSth(chunkOffset, chunkSize: Natural): Dummy =
## A chunk is processed on the same core
## It returns a dummy calue so that we can wait on it
for i in chunkOffset ..< chunkSize:
doSth(i)
# Let's not use heap allocation for this, but we need to wrap in a proc
when defined(windows):
proc alloca(size: csize): pointer {.header: "<malloc.h>".}
else:
proc alloca(size: csize): pointer {.header: "<alloca.h>".}
proc parNimSpawn() =
var tasks: ptr UncheckedArray[FlowVar[Dummy]]
tasks = cast[type tasks](alloca(countProcessors() * sizeof(FlowVar[Dummy])))
# Transforming this into a nice "parallel_for" is left as an exercise to the reader
parallelChunks(1, maxLim, chunkOffset, chunkSize, threadID):
# Spawn a task for each chunk
tasks[threadID] = spawn chunkedDoSth(chunkOffset, chunkSize)
# Wait all
for i in 0 ..< countProcessors():
let dummy = ^tasks[i]
benchmark "parallelized Nim spawn":
parNimSpawn()