Hi everyone, recently I have been experimenting with building a Go-style CSP based concurrency runtime in C. It is now evolved into a small M:N scheduled runtime with :
A worker thread pool (M:N scheduler)
Go-style channels (buffered & unbuffered)
select over multiple channels
Timers and ticker
Context-style cancellation
WaitGroup and Mutex primitives
Optional signal-based preemption
Since I am primarily designing it to be able to use goroutines like something in nim.
A question will be: why in C it has reason like I wanted that to be useful in other languages as well.
I recently got a larger “grand example” working that combines worker pools, select with timeout, cancellation, and synchronization across multiple cores. But note that It is still being stress-tested and hardened, but functionally it behaves similarly to goroutine-style concurrency.
Since this will take time(some weeks to months) for more testing and but I want to know opinion on of the Nim community on this and is something like this useful (besides my personal use) ?
I know that Nim has async/await (event-loop based) and OS threads with --threads:on and also lib like NimGo(through not actively maintained) and nor did I am claiming that this is better than them, I am just trying to make for my own use now if it perform well I will definitely plan.
So I am here to ask is there interest in Go-style CSP concurrency in Nim ? Also any advice is useful, (Please avoid offtopic and history related things).
Will try to port that if it perform as expected currently there is one minor problem in my code mostly related to select integration with timer but it can be resolved easily soon.
But more importantly that a part of my code even uses assembly inside c so I will need time to find some solution to that.
Huge interest, I would say it's the one big language thing I miss from go, even if using threads + threading/channels is not bad, it is not the same experience.
I like the level of abstraction that go has on this topic and would love to see this in nim (possibly natively)
Very interested in smth. like that. At the moment i study a piece of code that performs some fundamental computation - it merges two pre-sorted lists/arrays/seqs. The novel algorithm called Merge Path [1] allows for a parallel-merge. The only implementation i could find, has been done in Go [2]. The simple Go-code achieves parallelism using two go-routines,(1) func findSplitPoints() (finding the merge-path) and (2) func mergePath() to finally merge the lists following the merge-path. So far on the nim-side i tried malebolgia, std/typedthreads and taskpools hoping to achieve some positive gain in performance. My nim-code - though showing correct results - performs 3-10-times worse than the Go-routines, independent of the list-sizes. Whereas the parallel Go-code shows 1-3-times better performance than the sequential Go-/Nim-code, even on small lists. The sequential-code results of Nim and Go func seriesMergeSort() are roughly on par. So far my thoughts/findings are (a) maybe my nim-solutions are naive or stupid, 'cos i'm no expert in nim-concurrency or (b) the Go-machinery has some unique internal properties, that allow (a regular-joe like me) to make successful use of threads/co-routines. After some investigation into the details of the Go-machinery, i got the impression, that somehow behind the scenes it is able to distribute user-task(s) among one-or-more OS-threads and/or Co-routines/Fibers. Such that suddenly everybody is now entitled to do concurrent-computations without the need to comprehend the sophisticated details of the internal-machinery. And this is truly awesome ! The web is full of people asking for smth. that is as easily comprehensible and effective/performant in use as Go-routines. But sadly there is sweet-nuthing to be found :( But many have tried and published a machinery/scheduler that promises to achieve exactly that. The current nim-solutions are getting better with every update ( and i have not yet tried Weave ). Maybe you're nim-engine can provide what atm no existing nim-package can and no other C/C++ Co-routine/Fiber/Coloured-Fancy/Buzz-project can achieve. And if your machinery is capable to produce a performance gain when merging pre-sorted lists using some means of parallelism - that would be really awesome - that would be huge.
---
@Bosinski share your Nim code and have somebody on the forum optimize it. There is little reason to assume that Golang does anything special for
var wg sync.WaitGroup
for core := 0; core < numberOfCore; core++ {
wg.Add(1)
go func(coreNum int) { ...
And there is no reason whatsoever to assume that some new "concurrency runtime for Nim" will give you any better numbers...
Hello ARAQ,
and thx for your interest and response. Find some Nim-code using malebolgia attached below. Maybe you or somebody with some expertise is able improve/fix the code - i'd be very happy.
And there is no reason whatsoever to assume that some new "concurrency runtime for Nim" will give you any better numbers... Why? Because this code is parallel, not concurrent.
I fully agree and hope for a code-fix to fix the reality and the measurements from the nim/go-procs. BTW i found another MergePath implementation - this time C - using OpenMP - will look into it later. Not sure what the state of OpenMP in Nim is atm..
#[
file mergePath_male.nim
./nim c -r --assertions:on -d:lto -d:danger -d:useMalloc mergePath_male.nim
file mergePath.go
./go run mergePath.go -x 1048576 -y 1048576 -t 4
adapted from :: https://github.com/debuger6/MergePath/blob/main/src/merge_path.go
nim -v => Version 2.3.1 [MacOSX: amd64], Compiled at 2026-02-24
go version go1.26.0 darwin/amd64 installed 2026-02-24
]#
from std/random import rand, randomize
from std/strformat import fmt
from std/algorithm import sort
from std/monotimes import getMonotime, ticks
from std/cpuinfo import countProcessors
import malebolgia
randomize()
func now() :int64 = getMonotime().ticks
var master = createMaster()
proc seriesMergeSort( a,b :openArray[int] ) :seq[int] =
result.setLen( a.len + b.len )
var i, j, k :int
while (i < a.len ) and (j < b.len) :
if a[i] < b[j] : result[k] = a[i] ; i.inc ; k.inc
else : result[k] = b[j] ; j.inc ; k.inc
if i < a.len :
for idx in i .. a.high :
result[k] = a[idx] ; k.inc
if j < b.len :
for idx in j .. b.high :
result[k] = b[idx] ; k.inc
#[
proc seqFindSP( cores :int, a,b :seq[int] ) :seq[int] =
result.setLen( cores * 2 )
func work( coreId :int, result :ptr seq[int] ) =
var maxX, minX, x, y :int
let
combineIdx = coreId * ( a.len + b.len ) div cores
if combineIdx > a.len : maxX = a.len
else : maxX = combineIdx
while true :
x = (maxX + minX) div 2
y = combineIdx - x
if y > b.len :
y = b.len
x = combineIdx - y
if (y == 0) or (x == a.len ) or (a[x] > b[y-1]) :
if (x == 0) or (y == b.len) or (a[x-1] <= b[y]) :
result[ 2 * coreId ] = x
result[ 2 * coreId + 1] = y
break
else :
maxX = x.pred
else :
minX = x.succ
for coreId in 0 ..< cores : work( coreId, result.addr )
result.add @[a.len, b.len]
]#
proc parFindSP( coreId,cores :int, a,b :seq[int], result :ptr seq[int] ) =
var maxX, minX, x, y :int
let combineIdx = coreId * ( a.len + b.len ) div cores
if combineIdx > a.len : maxX = a.len
else : maxX = combineIdx
while true :
x = (maxX + minX) div 2
y = combineIdx - x
if y > b.len :
y = b.len
x = combineIdx - y
if (y == 0) or (x == a.len) or (a[x] > b[y-1]) :
if (x == 0) or (y == b.len) or (a[x-1] <= b[y]) :
result[ 2 * coreId ] = x
result[ 2 * coreId + 1] = y
break
else :
maxX = x.pred
else :
minX = x.succ
proc findSplitPoints( a,b :seq[int], cores :int ) :seq[int] =
result.setLen( cores * 2 + 2 )
( result[ cores*2 ], result[ cores*2+1 ] ) = (a.len, b.len)
master.awaitAll :
for coreId in 0 ..< cores :
master.spawn parFindSP(
coreId,cores,
a,b,
result.addr
)
proc parMergePath( coreId,cores :int, a,b,splitPoints :seq[int], result :ptr seq[int] ) =
#[ 获取片段的起点和终点,每个线程负责自己的片段,所以不会出现内存并发安全问题
The start and end points of the segment are obtained,
and each thread is responsible for its own segment,
so there will be no memory concurrency safety issues.
]#
let
startX = splitPoints[ 2 * coreId ]
startY = splitPoints[ 2 * coreId+1 ]
endX = splitPoints[ 2 * (coreId+1) ]
endY = splitPoints[ 2 * (coreId+1)+1]
var
(j, k) = (startX, startY)
idx = coreId * ( a.len + b.len ) div cores
while (j < endX) and (k < endY) :
if a[j] < b[k] : result[idx] = a[j] ; j.inc
else : result[idx] = b[k] ; k.inc
idx.inc
while j < endX :
result[idx] = a[j]
j.inc ; idx.inc
while k < endY :
result[idx] = b[k]
k.inc ; idx.inc
proc mergePath( a,b :seq[int], cores :int ) :seq[int] =
result.setLen( a.len + b.len )
let splitPoints = findSplitPoints( a, b, cores )
# let splitPointsA = seqFindSP( cores,a,b ) # sequential-version
# echo fmt"mergePath[{cores}] spA-{splitPointsA}"
# echo fmt"mergePath[{cores}] spB-{splitPoints}"
# assert splitPoints == splitPointsA, "???!"
master.awaitAll :
for coreId in 0 ..< max(1, cores) :
master.spawn parMergePath(
coreId, max(1, cores),
a,b,splitPoints,
result.addr
)
when isMainModule :
echo "mergePath_male::main"
let
MB = 1024 * 1024
arrLen = 1 * MB
var
a,b :seq[int]
t0 :int64
a.setLen(arrLen) ; b.setLen(arrLen)
for i in 0 ..< arrLen :
a[i] = rand(10 * MB) ; b[i] = rand(10 * MB)
a.sort() ; b.sort()
t0 = now()
let c1 = seriesMergeSort( a,b ) # sequential-merge
let seqTime = now() - t0
let cpus = countProcessors() div 2 # hyper-threading
t0 = now()
let c2 = mergePath( a,b, cpus ) # parallel-merge
let parTime = now() - t0
echo fmt"merged Seq-A and Seq-B into Seq-C of len-{c1.len}"
echo seqTime, "-ns sequential"
echo parTime, fmt"-ns parallel {cpus=} (Hyper-Threading reports 8-cpus DIV 2)"
echo fmt"{parTime / seqTime:2.2f}x note : the sequential-time represents 1X-baseline."
assert c1[0..128] == c2[0..128], "results.head don't match ?"
assert c1[^128..c1.high] == c2[^128..c2.high], "results-tail don't match ?"
Mergepath is for High-performance data operations, It is not related to task like Background task or Api(server, streaming) or event driven service like thing where goroutines like something is required.
Mergepath like data algorithms are famous Diagonal partition merge which are use in cpu/gpu bound task inside llm inference and bigdata handling but they can't handle lifecycle, dynamic work, waiting(timers).
So we are mixing Parallelism(performing big task) with Concurrency(doing many jobs at once)
In real world we need both of them like for example, for llm inference kind of task Diagonal partition merge kind of algorithms and for serving such service we need goroutines like Concurrency.
And as for Your comparison with go thread cost is very low around 2kb while Nim we need to depend on OS thread spawn(Although I am still learning Nim threading) and ARC/ORC will not be thread safe therefore it is difficult to archive something like that directly, And Go required a lot of time to archive that even with most senior Google engineers. And Go has different Aim therefore it has built-in Concurrency in its core runtime, while Nim is a System Programming language which go can't do therefore Nim doesn't required something like that till now seriously similar to other system programming language like C.
To solve this heavy OS thread vs thread safety along with nim memory management I used virtual memory manager that manages 64GB virtual heaps per CPU in my pure Nim version of the lib I was making in C.
Currently I have tested 100k goroutines and with a spawn rate of 45,000/s on single core, I haven't implemented or not tested on multicore.
And the values should not confuse someone as go can handle in million and 100-300k spawn rate. Also major drawback is that my current code is linux only (it uses assembly and posix).
As for what You have said (better performance for Parallelism) for that I recommend You that to get much better performance than go You need to use or design something like OpenMP or Intel tbb(onetbb) in Nim (You can use that directly in Nim with Nims excellent interlop. Or you can design something like that in Nim. Or make something like Rust Rayon for Nim
So in conclusion to make it(data algorithms like mergepath or other similar tasks) much faster. Nim doesn't needs goroutines like something Nim needs an efficient built-in/some implementation kind parallel runtime.
Hi Araq and Ghazali,
@Araq thx for pointing me towards malebolgia's parallel-algos - i'll try them soon. A nice in-between result after i tried Weave : in the first try the results for the parallel-computations dropped from 5-10X (meaning it took X-times longer than sequential) below 1.0x ! ( keeping in mind, i'm a Weave-greenhorn ). Best i've seen was 0.42 using four HW-threads. So since i'm confident, that (i don't need to package the Go-runtime 8=)) ) and one can come near to the - almost linear scaling - that the MergePath authors report in their paper.
I got lot better sequential performance in Nim
that sounds interesting - would you share the sequential func seriesMergeSort here ?!
I did some more changes and can now confirm that my latest parallel-versions using malebolia and taskpools as well, come down to 0.20-0.27-X which is on par (maybe slightly better) than the Go-version. I'm sure one can do the same with Weave, but that needs a bit more fiddling... MergePath is now generic and can benchmark e.g. float32.
from std/random import rand, randomize
from std/strformat import fmt
from std/algorithm import sort
from std/monotimes import getMonotime, ticks
from std/cpuinfo import countProcessors
from std/hashes import hashData
import taskpools
type UArr[T] = UncheckedArray[T]
const MB = 1024 * 1024
func now() :int64 = getMonotime().ticks
randomize()
func seriesMergeSort[T]( a,b :ref seq[T] ) :seq[T] =
let ( aLen, bLen ) = ( a[].len, b[].len )
result.setLen( aLen + bLen )
var i, j, k :int
while (i < aLen ) and (j < bLen) :
if a[i] < b[j] : result[k] = a[i] ; i.inc ; k.inc
else : result[k] = b[j] ; j.inc ; k.inc
if i < aLen :
for idx in i .. a[].high :
result[k] = a[idx] ; k.inc
if j < bLen :
for idx in j .. b[].high :
result[k] = b[idx] ; k.inc
var tp = Taskpool.new( num_threads = countProcessors() )
proc parFindSP[T]( coreId,cores, aLen,bLen :int; a,b :ptr UArr[T]; result :ptr seq[int] ) =
var maxX, minX, x, y :int
let combineIdx = coreId * ( aLen + bLen ) div cores
maxX = ( if combineIdx > aLen : aLen else : combineIdx )
while true :
x = (maxX + minX) div 2
y = combineIdx - x
if y > bLen :
y = bLen
x = combineIdx - y
if (y == 0) or (x == aLen) or (a[x] > b[y-1]) :
if (x == 0) or (y == bLen) or (a[x-1] <= b[y]) :
result[ 2 * coreId ] = x
result[ 2 * coreId + 1] = y
break
else :
maxX = x.pred
else :
minX = x.succ
proc findSplitPoints[T]( a,b :ptr UArr[T], cores,aLen,bLen :int ) :seq[int] =
result.setLen 2 * cores
result.add @[ aLen, bLen ]
for coreId in 0 ..< cores :
tp.spawn parFindSP(
coreId,cores,
aLen, bLen, a, b,
result.addr
)
tp.syncAll()
proc parMergePath[T](
a,b :ptr UArr[T],
startX, startY,
endX, endY :int,
results :ptr UArr[T]
) =
var
(j, k) = (startX, startY)
idx :int
while (j < endX) and (k < endY) :
if a[j] < b[k] : results[idx] = a[j] ; j.inc
else : results[idx] = b[k] ; k.inc
idx.inc
while j < endX :
results[idx] = a[j]
j.inc ; idx.inc
while k < endY :
results[idx] = b[k]
k.inc ; idx.inc
proc mergePath[T]( cores :int, a,b :ref seq[T] ) :ptr UArr[T] =
let
# splitPoints = seqFindSP( cores,a,b ) # sequential-version
aLoc = cast[ptr UArr[T]]( a[0].addr )
bLoc = cast[ptr UArr[T]]( b[0].addr )
splitPoints = findSplitPoints( aLoc, bLoc, cores, a[].len, b[].len )
ll = a[].len + b[].len
# let splitPointsA = seqFindSP( cores,a,b ) # sequential-version
# echo fmt"mergePath[{cores}] spA-{splitPointsA}"
# echo fmt"mergePath[{cores}] spB-{splitPoints}"
# assert splitPoints == splitPointsA, "???!"
result = cast[ptr UArr[T]]( allocShared( sizeof(T) * ll ))
let sp = splitPoints
for coreId in 0 ..< cores :
let idx = 2 * coreId
tp.spawn parMergePath(
aLoc, bLoc,
sp[idx], sp[idx+1], sp[idx+2], sp[idx+3],
cast[ptr UArr[T]]( result[coreId * ll div cores].addr )
)
tp.syncAll()
when isMainModule :
echo "mergePath_taskpool::main"
let arrLen = 4 * MB
var
t0 :int64
a,b = new seq[float32]
i :int
a[].setLen( arrLen ) ; b[].setLen( arrLen )
while i < arrLen :
a[i] = rand(15*MB).float32 ; b[i] = rand(15*MB).float32
i.inc
a[].sort() ; b[].sort()
t0 = now()
var c3 = a[] & b[]
c3.sort() # sort by std/algorithm
let stdTime = now() - t0
t0 = now()
let
c1 = a.seriesMergeSort b # sequential-merge
seqTime = now() - t0
let cpus = countProcessors() # div 2 # halfed cos' of hyper-threading
t0 = now()
let
c2 = cpus.mergePath( a,b ) # n-way parallel-merge
parTime = now() - t0
assert hashData(c1[0].addr, c1.len) == hashData(c2[0].addr, c1.len), "Hash mismatch !"
echo fmt"merged Seq-A and Seq-B into Seq-C of len-{c1.len} x {$typeof(c1[0])}"
echo stdTime, "-ns std/sort"
echo seqTime, "-ns sequential"
echo parTime, fmt"-ns parallel {cpus=} (Hyper-Threading reports 8-cpus DIV 2)"
echo fmt"{parTime / seqTime:2.2f}x note : the sequential-time represents 1X-baseline."
echo fmt"{stdTime / c1.len:2.2f}-ns/element std/sort !! this is not a fair bench !!"
echo fmt"{seqTime / c1.len:2.2f}-ns/element sequential."
echo fmt"{parTime / c1.len:2.2f}-ns/element parallel."
echo fmt"c1 {c1[0..10]} {$hashData(c1[0].addr, c1.len)}"
echo fmt"c2 {c2.toOpenArray(0, (2*arrLen))[0..10]} {$hashData(c2[0].addr, c1.len)}"
echo fmt"c1 {c1[^10..^1]}"
echo fmt"c2 {c2.toOpenArray(0, (2*arrLen).pred)[^10..^1]}"
c2.deallocShared()
tp.shutdown() The code I used is (note there can be change in benchmark on my side due to cache) :
import std/[random, algorithm, monotimes, cpuinfo, strformat]
import malebolgia
{.push checks:off, boundChecks:off, overflowChecks:off.}
proc now(): int64 {.inline.} =
getMonotime().ticks
var master = createMaster()
# Sequential merge (baseline)
proc seqMerge(a, b: ptr UncheckedArray[int],
alen, blen: int,
dst: ptr UncheckedArray[int]) {.inline.} =
var i, j, k: int
while i < alen and j < blen:
if a[i] <= b[j]:
dst[k] = a[i]; inc i
else:
dst[k] = b[j]; inc j
inc k
while i < alen:
dst[k] = a[i]; inc i; inc k
while j < blen:
dst[k] = b[j]; inc j; inc k
# Find split points (sequential)
proc findSplits(a, b: ptr UncheckedArray[int],
alen, blen, cores: int,
splits: ptr UncheckedArray[int]) =
let total = alen + blen
for core in 0 ..< cores:
let diag = core * total div cores
var lo = max(0, diag - blen)
var hi = min(diag, alen)
while lo <= hi:
let x = (lo + hi) shr 1
let y = diag - x
if x < alen and y > 0 and a[x] < b[y-1]:
lo = x + 1
elif x > 0 and y < blen and a[x-1] > b[y]:
hi = x - 1
else:
splits[core*2] = x
splits[core*2+1] = y
break
splits[cores*2] = alen
splits[cores*2+1] = blen
# Parallel worker
proc worker(core, cores: int,
a, b, splits, dst: ptr UncheckedArray[int],
alen, blen, total: int) {.inline.} =
let sx = splits[core*2]
let sy = splits[core*2+1]
let ex = splits[(core+1)*2]
let ey = splits[(core+1)*2+1]
var i = sx
var j = sy
var k = core * total div cores
while i < ex and j < ey:
if a[i] <= b[j]:
dst[k] = a[i]; inc i
else:
dst[k] = b[j]; inc j
inc k
while i < ex:
dst[k] = a[i]; inc i; inc k
while j < ey:
dst[k] = b[j]; inc j; inc k
# High-performance mergePath
proc mergePathFast(a, b: seq[int], cores: int): seq[int] =
let alen = a.len
let blen = b.len
let total = alen + blen
result = newSeqUninitialized[int](total)
let ap = cast[ptr UncheckedArray[int]](unsafeAddr a[0])
let bp = cast[ptr UncheckedArray[int]](unsafeAddr b[0])
let dp = cast[ptr UncheckedArray[int]](unsafeAddr result[0])
var splits = newSeqUninitialized[int](cores*2 + 2)
let sp = cast[ptr UncheckedArray[int]](addr splits[0])
findSplits(ap, bp, alen, blen, cores, sp)
master.awaitAll:
for c in 0 ..< cores:
master.spawn worker(c, cores, ap, bp, sp, dp, alen, blen, total)
# Benchmark
when isMainModule:
randomize()
const MB = 1024 * 1024
let arrLen = 10 * MB # IMPORTANT: must be large
var a = newSeq[int](arrLen)
var b = newSeq[int](arrLen)
for i in 0 ..< arrLen:
a[i] = rand(100_000_000)
b[i] = rand(100_000_000)
a.sort()
b.sort()
let physical = countProcessors()
echo "Using physical cores: ", physical
# Sequential
var seqRes = newSeqUninitialized[int](arrLen*2)
let ap = cast[ptr UncheckedArray[int]](unsafeAddr a[0])
let bp = cast[ptr UncheckedArray[int]](unsafeAddr b[0])
let dp = cast[ptr UncheckedArray[int]](addr seqRes[0])
var t0 = now()
seqMerge(ap, bp, arrLen, arrLen, dp)
let seqTime = now() - t0
# Parallel
t0 = now()
let parRes = mergePathFast(a, b, physical)
let parTime = now() - t0
echo fmt"Sequential: {seqTime div 1_000_000} ms"
echo fmt"Parallel: {parTime div 1_000_000} ms"
echo fmt"Speedup: {seqTime.float / parTime.float:2.2f}x"
doAssert seqRes == parRes
{.pop.}