I was trying to port this example to use Nim threads and channels.
My implementation is:
import times, math
proc calcPartialPi(s: tuple[r: Channel[float], iter: int]) =
var (r, iter) = s
var count = 0
for _ in 1 .. iter:
let (x, y) = (random(1.0), random(1.0))
if (x * x + y * y) < 1:
count += 1
r.send(4.0 * count.float / iter.float)
proc calcPi(iter, processes: int): float =
var acc = 0.0
var r: Channel[float]
r.open()
for _ in 1 .. processes:
var th: Thread[(Channel[float], int)]
createThread(th, calcPartialPi, (r, iter))
for _ in 1 .. processes:
acc += r.recv()
return acc / processes.float
proc main() =
const
iter = 50000
processes = 500
let s = epochTime()
echo calcPi(iter, processes)
let time = epochTime() - s
echo "Time elapsed: ", time
when isMainModule:
main()
The configuration is
--define:release
--threads:on
I am sure it used to run correctly just a few minutes ago. Then I made other programs that make use of channels and threads, and now, after recompiling it, it segfaults every time.
Can anyone spot if I am doing anything wrong?
import times, math
# for auto dereferencing of ptr in SharedChannel
{.experimental.}
type SharedChannel[T] = ptr Channel[T]
proc newSharedChannel[T](): SharedChannel[T] =
result = cast[SharedChannel[T]](allocShared0(sizeof(Channel[T])))
open(result[])
proc close[T](ch: var SharedChannel[T]) =
close(ch[])
deallocShared(ch)
ch = nil
proc calcPartialPi(s: tuple[r: SharedChannel[float], iter: int]) =
var (r, iter) = s
var count = 0
for _ in 1 .. iter:
let (x, y) = (random(1.0), random(1.0))
if (x * x + y * y) < 1:
count += 1
r.send(4.0 * count.float / iter.float)
proc calcPi(iter, processes: int): float =
var acc = 0.0
var r = newSharedChannel[float]()
for _ in 1 .. processes:
var th: Thread[(SharedChannel[float], int)]
createThread(th, calcPartialPi, (r, iter))
for _ in 1 .. processes:
acc += r.recv()
return acc / processes.float
proc main() =
const
iter = 50000
processes = 500
let s = epochTime()
echo calcPi(iter, processes)
let time = epochTime() - s
echo "Time elapsed: ", time
when isMainModule:
main()
I don't think
for _ in 1 .. processes:
var th: Thread[(SharedChannel[float], int)]
createThread(th, calcPartialPi, (r, iter))
can work and I don't know why you don't simply use spawn either. You cannot blindly translate Go's concurrency primitives to Nim.
As per this thread, using spawn together with channels will not work properly. The reason is that many operations in channels are blocking, and if more threads are blocked than the threadpool size, everything will deadlock.
The reason why I am copying Go concurrency primitives is that I am running a benchmark about various implementations of channels across languages wrt Go. I already have one for Nim that uses golib-nim, but I wanted to try one with the stdlib channels as well
Your problem is that you're storing all thread information in a single thread variable. That variable contains the actual state of the thread, not just an id (unlike, say, pthread_create()). Each time you create a new thread, you smash the previous threads' state. Use a seq[Thread[T]] instead and give each thread its own block in that sequence (and don't forget to use joinThread() at the end).
It's also not a good idea to have the thread state in a local variable unless you can be sure that the thread exits (again, joinThread()) before the function containing the variable returns.
I played with that code and made a version which seems to work fine as demonstration. I added some code which makes joinThread() show its effect.
import times, math, os
# for auto dereferencing of ptr in SharedChannel
{.experimental.}
type SharedChannel[T] = ptr Channel[T]
proc newSharedChannel[T](): SharedChannel[T] =
result = cast[SharedChannel[T]](allocShared0(sizeof(Channel[T])))
open(result[])
proc close[T](ch: var SharedChannel[T]) =
close(ch[])
deallocShared(ch)
ch = nil
proc calcPartialPi(s: tuple[r: SharedChannel[float], iter: int]) =
var (r, iter) = s
var count = 0
for _ in 1 .. iter:
let (x, y) = (random(1.0), random(1.0))
if (x * x + y * y) < 1:
count += 1
r.send(4.0 * count.float / iter.float)
sleep(5000) # just for demonstration
proc calcPi(iter, processes: int): float =
let s = epochTime()
var acc = 0.0
var r = newSharedChannel[float]()
var threads = newSeq[Thread[(SharedChannel[float], int)]](processes)
echo "starting threads"
for t in mitems(threads):
createThread(t, calcPartialPi, (r, iter))
echo "all threads started"
echo "receiving results"
for _ in 1 .. processes:
acc += r.recv()
echo "received all"
let time1 = epochTime() - s
echo "Time elapsed: ", time1
echo acc / processes.float
echo "waiting on all threads to finish"
joinThreads(threads)
echo "threads done"
let time2 = epochTime() - s
echo "Time elapsed: ", time2
return acc / processes.float
proc main() =
const
iter = 500000
processes = 500
echo calcPi(iter, processes)
when isMainModule:
main()
Really nice example.
I tried something with channels and threads myself (after reading that channels with spawn can give problems) from scratch -- unfortunately I got it not to compile. (And gvim's error display does not work at all with code which needs --threads:on argument.) Took a while finding your example, but it works fine and I can use it now for my own experiments. :-)
You define proc close, but do not use it?
I dont know if Nim channels can work bidirectional, but somehow bidirectional message passing should be possible. So I tried this. Is that fundamentally wrong, or did it just not work? (Seems to block)
# for auto dereferencing of ptr in SharedChannel
{.experimental.}
type SharedChannel[T] = ptr Channel[T]
proc newSharedChannel[T](): SharedChannel[T] =
result = cast[SharedChannel[T]](allocShared0(sizeof(Channel[T])))
open(result[])
proc close[T](ch: var SharedChannel[T]) =
close(ch[])
deallocShared(ch)
ch = nil
proc calcPartialPi(s: tuple[r: SharedChannel[float], iter: int]) =
var (r, iter) = s
var acc = r.recv()
r.send(4.0 + acc)
#sleep(5000) # just for demonstration
proc calcPi(iter, processes: int): float =
var r = newSharedChannel[float]()
var thread: Thread[(SharedChannel[float], int)]
createThread(thread, calcPartialPi, (r, iter))
r.send(2.0)
var acc = r.recv()
joinThreads(thread)
return acc
proc main() =
echo calcPi(0, 0)
when isMainModule:
main()
Using two channels it seems to work fine:
import os
# for auto dereferencing of ptr in SharedChannel
{.experimental.}
type SharedChannel[T] = ptr Channel[T]
proc newSharedChannel[T](): SharedChannel[T] =
result = cast[SharedChannel[T]](allocShared0(sizeof(Channel[T])))
open(result[])
proc close[T](ch: var SharedChannel[T]) =
close(ch[])
deallocShared(ch)
ch = nil
proc calcPartialPi(s: tuple[r1, r2: SharedChannel[float], iter: int]) =
var (r1, r2, iter) = s
var acc = r1.recv()
r2.send(4.0 + acc)
#sleep(5000) # just for demonstration
proc calcPi(iter, processes: int): float =
var r1 = newSharedChannel[float]()
var r2 = newSharedChannel[float]()
var thread: Thread[(SharedChannel[float], SharedChannel[float], int)]
createThread(thread, calcPartialPi, (r1, r2, iter))
r1.send(2.0)
var acc = r2.recv()
joinThreads(thread)
return acc
proc main() =
echo calcPi(0, 0)
when isMainModule:
main()
Is that a general property of channels, that they work only for one direction? Was not able to find information about this topic in manual or with google.
[EDIT]
OK, Rust channels are for unidirectional flow of information, so I guess that is the case for Nim also. So not a bug, we have to use two distinct channels?