I recently found nim and have been trying it on a small program. So far, nim looks like a nice language to use. Currently I'm looking into the parallel/concurrent features.
What I'm looking for is fairly simple. I need some worker threads that get assigned some work (which can be just a string) and give back some result (which can also be a string). The workers keep on looping to fetch work and execute it.
So I need to have some form of communication between the main program/thread and the worker threads. The manual mentions:
For safe data exchange ... a global TChannel needs to be used ...
But I'm not sure how to use TChannel correctly. I'm trying it on the very simple test program below, but can not get it to work. It has 3 channels: one for each consumer that gets spawned.
import threadpool
type StringChannel = TChannel[string]
var channels : array[1..3, StringChannel]
proc consumer(ix : int, channel : StringChannel) {.thread.} =
echo channels[ix].recv() # <--- not GC-safe: 'channels'
echo channel.recv() # <--- a 'var' type variable
# needs to be passed
proc main =
for ix in 1..3: channels[ix].open()
for ix in 1..3: spawn consumer(ix, channels[ix])
for ix in 1..3: channels[ix].send("test")
sync()
for ix in 1..3: channels[ix].close()
when isMainModule:
main()
Whichever way I try to access a channel from within a consumer I hit some compiler error:
When I pass a channel as a parameter, the compiler says it want's a var parameter, but that does not work with spawn.
When I try to access a global, it is marked as GC-unsafe.
Does anybody know what I'm doing wrong ? Or should I be using a different strategy ?
import threadpool
type StringChannel = TChannel[int]
var channels : array[1..3, StringChannel]
proc consumer(ix : int, channel : StringChannel) {.thread.} =
echo channels[ix].recv() # <--- not GC-safe: 'channels'
proc main =
for ix in 1..3: channels[ix].open()
for ix in 1..3: spawn consumer(ix, channels[ix])
for ix in 1..3: channels[ix].send(10)
sync()
for ix in 1..3: channels[ix].close()
when isMainModule:
main()
Edit2: On the other hand it works with a single channel of a string instead of an array of channels of strings, so this clearly looks like a bug:
import threadpool
type StringChannel = TChannel[string]
var channel: StringChannel
proc consumer(ix : int) {.thread.} =
echo channel.recv()
proc main =
channel.open()
spawn consumer(0)
channel.send("foo")
sync()
channel.close()
when isMainModule:
main()
Accessing a global TChannel works and it should work when it's in a global array of channels, but doesn't so, bug report please!
EDIT But passing it as a 'ptr TChannel[string]' should be a valid workaround.
It seems to work with the pointer. I also used a pointer to the array and keep the index which also worked with the same result as this version.
BTW: It that warning supposed to go away?
import threadpool
type StringChannel = TChannel[string]
var channels : array[1..3, StringChannel]
proc consumer(channel: ptr StringChannel) {.thread.} =
echo channel[].recv() ###### not GC-safe: 'channels'
proc main =
for ix in 1..3: channels[ix].open()
for ix in 1..3: spawn consumer(addr(channels[ix]))
for ix in 1..3: channels[ix].send("test")
sync()
for ix in 1..3: channels[ix].close()
when isMainModule:
main()
Gives
parallel_gcunsafe.nim(10, 18) Warning: not GC-safe: 'channels' [GcUnsafe]
parallel_gcunsafe.nim(11, 38) Warning: not GC-safe: 'channels' [GcUnsafe]
parallel_gcunsafe.nim(12, 18) Warning: not GC-safe: 'channels' [GcUnsafe]
parallel_gcunsafe.nim(14, 18) Warning: not GC-safe: 'channels' [GcUnsafe]
parallel_gcunsafe.nim(17, 6) Warning: not GC-safe: 'main()' [GcUnsafe]
test
test
test
All, thanks a lot for trying to help me out on this. It's nice to know that it is the intention that such programs should work.
@Araq: As suggested, I did file an issue on github for this.
@OderWat: I'm trying out your version. It does compile OK, but with the warnings as you see too.
But on my system (Ubunutu, Nim Version 0.10.3) It does not run correctly. It freezes at the first echo channel[].recv() It does work when I send something to the channels before spawning the consumer threads. But every recv on an empty channel seems to freeze all threads.
I'll try to investigate a bit more what is going on.
EDIT I just tried the program (using the ptr solution) on multiple machines. Unfortunatly it sometimes works and sometime it freezes at the recv part and locks up all threads. An easy way to make it freeze, is to make the number of consumers higher, e.g. to 12.
After some more investigating, I think that the freezing is unrelated to the original multiple channels problem.
I made a version of the program with only 1 channel:
import threadpool, strutils
var channel : TChannel[string]
proc consumer(ix : int) {.thread.} =
echo intToStr(ix).align(2) & ": " & channel.recv()
proc main =
channel.open()
for ix in 1..22: spawn consumer(ix)
for ix in 1..22: channel.send("foo")
sync()
channel.close()
when isMainModule:
main()
Also this program sometimes works and sometimes freezes. So, I guess there must be some lock/synchronization problems somewhere.
No, it's just that you didn't understand spawn and I think its docs are wrong or misleading. spawn is not createThread. createThread really creates a thread and as such will guarantee it will return. spawn simply waits for a thread in the thread pool to become available for the task. Well but you block every single of the available worker threads with the recv...
In theory the compiler is allowed to remove the spawn with no observable change in behaviour, so for compile-time evaluation we could simple ignore it. (We don't support compile-time evaluation of spawn yet, however.)
In summary: TChannel has been designed for TThread, spawn plus a channel is wrong.
EDIT This doesn't mean the threadpool is free of real bugs, the following test had to be disabled for our Linux test machines:
discard """
output: '''50005000'''
disabled: "true"
"""
# XXX this seems to deadlock certain Linux machines
import threadpool, strutils
proc foo(x: int): string = $x
proc main() =
var a = newSeq[int]()
for i in 1..10000:
add(a, i)
var s = 0
for i in a:
s += parseInt(^spawn(foo(i)))
echo s
setMaxPoolSize 2
parallel:
spawn main()
That makes a lot of sense and clarifies a lot. Thanks for the explanation.
Knowing this, I should have been using createThread from the start for the kind of communicating processes I'm looking for.
Using createThread and the ptr workaround, I now managed to get the simple program to work OK.
import strutils
type
StringChannel = TChannel[string]
var
channels : array[0..3, StringChannel]
thr: array [0..3, TThread[ptr StringChannel]]
proc consumer(channel: ptr StringChannel) {.thread.} =
echo channel[].recv()
channel[].send("fighters")
proc main =
for ix in 0..3: channels[ix].open()
for ix in 0..3: createThread(thr[ix], consumer, addr(channels[ix]))
for ix in 0..3: channels[ix].send("foo (" & intToStr(ix) & ")")
joinThreads(thr)
for ix in 0..3: echo channels[ix].recv()
for ix in 0..3: channels[ix].close()
when isMainModule:
main()
The compiler still gives the not GC-safe warning, but the program is working correct. So, I'll go for this approach in the real program I'm writing.
@def, @Araq, @OderWat: Thanks a lot help me out on this in such a short timeframe. It's much appreciated !
The ticket associated with this forum thread was closed 2 weeks ago ( https://github.com/Araq/Nim/issues/2257 ).
If I raise the value of ch_max high enough in the program below, I can reliably get the program to freeze.
Running on Ubuntu 14.04.2 LTS.
I'm just learning nim, so grain of salt.
I added a comment on the ticket too.
import threadpool
const ch_max:int = 3
var channels: array[1..ch_max, TChannel[string]]
proc consumer(ix : int) {.thread.} =
echo "received: " & channels[ix].recv()
proc main =
for ix in countup(1,ch_max):
echo "opening: " & $ix & "/" & $ch_max
channels[ix].open()
for ix in countup(1,ch_max):
echo "spawning: " & $ix & "/" & $ch_max
spawn consumer(ix)
for ix in countup(1,ch_max):
echo "sending: " & $ix & "/" & $ch_max
channels[ix].send("channel recv " & $ix)
sync()
echo "synced"
for ix in countup(1,ch_max): channels[ix].close()
when isMainModule:
echo "<<begin>>"
main()
echo "<<end>>"
In summary: TChannel has been designed for TThread, spawn plus a channel is wrong.
I think I didn't read the above line closely enough before posting the message above.
Will update comment on ticket
How do you pass multiple args to threads?
i have this
import threadpool, os
const concurrent = 2
type StringChannel = TChannel[string]
type Worker = TThread[ptr StringChannel, int]
var channels:array [0..concurrent, StringChannel]
var threads:array [0..concurrent, Worker]
proc do_work(channel: ptr StringChannel, i : int) {.thread.} =
channel[].open()
while true:
channel[].send("thread = " & char(i))
proc start_threads =
for i, t in threads:
createThread(threads[i], do_work, addr(channels[i]), i)
when isMainModule:
start_threads()
while true:
for i, c in channels:
os.sleep(1000)
echo channels[i].recv()
which gives
Error: cannot instantiate TThread
got: (typedesc[ptr StringChannel], typedesc[int])
but expected: (TArg)
so confused :C
How do you pass multiple args to threads?
As a tuple or an object I guess:
# This is a comment
import threadpool, os
const concurrent = 2
type StringChannel = TChannel[string]
type Worker = TThread[(ptr StringChannel, int)]
var channels:array [concurrent, StringChannel]
var threads:array [concurrent, Worker]
proc do_work(t: tuple[channel: ptr StringChannel, i : int]) {.thread.} =
t.channel[].open()
while true:
t.channel[].send("thread = " & $t.i)
proc start_threads =
for i, t in threads:
createThread(threads[i], do_work, (addr(channels[i]), i))
when isMainModule:
start_threads()
while true:
for i, c in channels:
os.sleep(1000)
echo channels[i].recv()
As a tuple or an object I guess:
aye that doesnt work, it gives this:
Error: cannot instantiate TThread
got: (tuple[typedesc[ptr StringChannel], typedesc[int]])
but expected: (TArg)
whats a TArg :S. btw what does the $t.i do ?
It works with the current devel branch, I forgot that anonymous tuples weren't in 0.10.2. This should work in 0.10.2:
# This is a comment
import threadpool, os
const concurrent = 2
type StringChannel = TChannel[string]
type Arg = tuple[channel: ptr StringChannel, i: int]
type Worker = TThread[Arg]
var channels:array [concurrent, StringChannel]
var threads:array [concurrent, Worker]
proc do_work(t: Arg) {.thread.} =
t.channel[].open()
while true:
t.channel[].send("thread = " & $t.i)
proc start_threads =
for i, t in threads:
createThread(threads[i], do_work, (addr(channels[i]), i))
when isMainModule:
start_threads()
while true:
for i, c in channels:
os.sleep(1000)
echo channels[i].recv()
whats a TArg :S
Thread argument I guess.
btw what does the $t.i do ?
$ is the convention for procs that convert anything to a string. So this makes the number t.i into a string. char(t.i) returns an unprintable character (with binary value 0 or 1 here).
It works with the current devel branch, I forgot that anonymous tuples weren't in 0.10.2. This should work in 0.10.2:
$ is the convention for procs that convert anything to a string. So this makes the number t.i into a string. char(t.i) returns an unprintable character (with binary value 0 or 1 here).
jeeej it works, im sry im complete noob to nim started today,
can i pick your mind about something.
lets take this func
proc start_threads =
for i, t in threads:
createThread(threads[i], do_work, (addr(channels[i]), i))
why is it that when i change that to
proc start_threads =
for i, t in threads:
createThread(t, do_work, (addr(channels[i]), i))
it errors out with
Error: for a 'var' type a variable needs to be passed
or when i do
proc start_threads =
for i, t in threads:
var ch : StringChannel = channels[i]
createThread(threads[i], do_work, (addr(ch), i))
it gives
SIGSEGV: Illegal storage access. (Try to compile with -d:useSysAssert -d:useGcAssert for details.)
Error: execution of an external program failed
Error: for a 'var' type a variable needs to be passed
This error tells you that t is immutable, but has to be mutable as the definition of createThread requires. When you write for i, t in threads: the pairs iterator is called, which returns immutable values. With the current devel version you can use mpairs instead, like this:
proc start_threads =
for i, t in threads.mpairs:
createThread(t, do_work, (addr(channels[i]), i))
SIGSEGV: Illegal storage access. (Try to compile with -d:useSysAssert -d:useGcAssert for details.)
Error: execution of an external program failed
With var ch = channels[i] you copy the channel into a new ch variable. You pass a pointer onto this stack variable to the thread you create. Then the start_threads proc ends, and ch stops existing, so addr(ch) points to something invalid now. To get a stacktrace you should compile without -d:release.