Playing with threads I found strange behaviour of spawn. I made a simple test of multi-threading: there is the routine
proc multithreadedPrint(sMsg: string, nCount: int)
which prints the specified string nCount number of times. And each character of the string is printed by its own thread. So number of threads spawned is equal to the length of the string provided (in Unicode characters).
And the strange behaviour is: when I spawn 8 or less threads (I tested on 4-cores CPU) all works just perfect, but when I spawn 9+ threads this app never finishes.
import unicode, threadpool, locks, os
proc lenU*(s: string): int =
result = s.runeLen
proc charAtPosU*(s: string, pos: int): string =
assert(pos >= 0 and pos < s.runeLen)
result = s.runeAtPos(pos).toUTF8()
proc multithreadedPrint(sMsg: string, nCount: int) =
var
nLen = sMsg.lenU
nCallsTotal = 0
nCallsCur = 0
lk: Lock
res = 0
proc worker(c: string, value: int) {.gcsafe.}=
while true:
acquire(lk)
try:
if nCallsCur == nCallsTotal:
return
if res == value:
inc res
res = res mod nLen
inc nCallsCur
stdout.write c
else:
sleep(1)
finally:
release(lk)
if nLen > 0:
if nLen > MaxDistinguishedThread:
echo "Your string is too long. Maximum allowed is ", MaxDistinguishedThread, "!"
return
setMinPoolSize(nLen)
setMaxPoolSize(nLen)
initLock(lk)
try:
nCallsTotal = nLen * nCount
echo("Total threads: ", nLen)
echo("Total calls: ", nCallsTotal)
for i in 0..<nLen:
spawn worker(sMsg.charAtPosU(i), i)
sync()
finally:
deinitLock(lk)
multithreadedPrint("0123456789", 2)
Interesting addition:
I just tested the same in Ubuntu 16.04 under VirtualBox with 2 virtual CPUs.
And it works for string lengths up to 5.
With strings having 6, 7, 8 ... characters it hangs in 100% of cases.
The problem is that threads go to sleep while still holding a lock.
Well, the real problem is that afaik there is no guarantee that this code will ever finish, because the scheduler is not required to ever wake the right thread when a bunch of other threads are still waiting. But anyway, Moving the sleep call behind the lock seems to work fine in practice:
import unicode, threadpool, locks, os
proc lenU*(s: string): int =
result = s.runeLen
proc charAtPosU*(s: string, pos: int): string =
assert(pos >= 0 and pos < s.runeLen)
result = s.runeAtPos(pos).toUTF8()
proc multithreadedPrint(sMsg: string, nCount: int) =
var
nLen = sMsg.lenU
nCallsTotal = 0
nCallsCur = 0
lk: Lock
res = 0
proc worker(c: string, value: int) {.gcsafe.}=
while true:
var found = true
acquire(lk)
try:
if nCallsCur == nCallsTotal:
return
if res == value:
inc res
res = res mod nLen
inc nCallsCur
stdout.write c
else:
found = false
finally:
release(lk)
if not found: sleep(1)
if nLen > 0:
if nLen > MaxDistinguishedThread:
echo "Your string is too long. Maximum allowed is ", MaxDistinguishedThread, "!"
return
setMinPoolSize(nLen)
setMaxPoolSize(nLen)
initLock(lk)
try:
nCallsTotal = nLen * nCount
echo("Total threads: ", nLen)
echo("Total calls: ", nCallsTotal)
for i in 0..<nLen:
spawn worker(sMsg.charAtPosU(i), i)
sync()
finally:
deinitLock(lk)
multithreadedPrint("0123456789", 2)
The problem is that threads go to sleep while still holding a lock.
Yes, this was my mistake. Thanks! But anyway after moving sleep to the right place all still hangs with 6+ characters.
Well, the real problem is that afaik there is no guarantee that this code will ever finish, > because the scheduler is not required to ever wake the right thread when a bunch of other threads are still waiting.
I think this shouldn't be the case.
Even when we have 100+ threads an OS thread scheduler gives the chance to be executed to each one.
So even without any sleep entire work should definitely finish but CPU load would be 100%.
P.S. I just tried to test without sleep: all finishes for 1-5 characters - 100% CPU load. It seems there's some other reason of hang up.
Well it works for me with 32 characters in the string (Linux, 4 CPUs). So it seems to depend a bit on the OS - and therefore, the scheduler.
I think this shouldn't be the case.
I'll try to explain what might happen. I do not know all internals involved, so it may or may not be accurate.
Suppose all the threads have been started and we are at some point x where one thread just finished writing its character. That thread releases the lock, and the scheduler sees „ah, the lock has been released, I have a bunch of threads waiting for that, let's wake the next in line. And because I have multiple CPUs, let's directly start multiple threads.“
The next thread in line - lets call it T1 - executes and acquires the lock. Unfortunately, it is not the thread that can write the next character. So it releases the lock and goes to sleep for 1ms. Meanwhile, another thread (T2) has been started and tries to acquire the lock again, but T1 is still holding the lock. So T2 tells the scheduler it is still blocked by the lock and goes to sleep again. The scheduler moves it to the back of the queue of blocked threads. Now, unfortunately, T2 is actually the thread that can write the next character. But the scheduler will now execute all the other threads first. Then, if you're lucky, T2 will finally be executed because all other threads currently sleep.
This, however, will only happen if the scheduler can make all the necessary context switches between threads, all the executions and sleeps etc, in less than 1ms (the time the other threads sleep). Depending on your system, that may or may not be enough time (I do not have the metrics on context switches at hand, so I can only guess). If a round-trip through all the threads takes more than 1ms, it can happen that T2 is blocked again by another thread when it is its turn to execute. And the same circle happens again. So it cannot be guaranteed that T2 will ever be executed while the lock is acquirable by it.
You may try to increase sleep time. But all in all, this code is outright horrible because of those interdependencies, and you should never do something like that in production code.
Hi flux!
Thank you for your interest to this problem.
Well...
All you write is almost perfect, but in practice if we execute 10-20 threads then scheduler wakes up these T1, T2, ... T19 in cycle and even if we don't sleep or sleep 1ms it would work. You are right that when scheduler tries to execute Tx it can be blocked, but here is the mostly tricky moment: all threads are waiting for single resource and there will be a moment when lucky Tx does its work.
And of course the code is bad but it should be bad to check if each thread has a chance to be executed.
Despite I know that it should work I wrote same test in C++ (this code uses my thread pool written some time ago to simulate the logic like nim's spawn). It spawns 40 threads and prints 80 characters in order (with and without sleep).
P.S. After comparing this C++ code to nim I started to love nim even more (because ThreadPool.h has 180+ lines of C++ code). :-)
P.P.S. After all the testing I still suspect that something is wrong with nim's threading.
size_t g_nLen;
size_t g_nCallsTotal = 0;
size_t g_nCallsCur = 0;
size_t g_nRes = 0;
std::mutex g_mutex;
std::stringstream g_msg;
void worker(char ch, size_t nValue) {
while(true) {
bool bWorkDone = true;
{
std::unique_lock<std::mutex> lck(g_mutex);
if(g_nCallsCur == g_nCallsTotal)
return;
if(g_nRes == nValue) {
g_nRes++;
g_nRes %= g_nLen;
g_nCallsCur++;
g_msg << ch;
} else{
bWorkDone = false;
}
}
if(!bWorkDone) {
std::chrono::milliseconds sleepDuration(1);
std::this_thread::sleep_for(sleepDuration);
}
}
}
int main(int argc, char* argv[]) {
const std::string sText = "0123456789012345678901234567890123456789";
g_nLen = sText.size();
g_nCallsTotal = g_nLen * 2;
CThreadPool *pPool = new CThreadPool(g_nLen);
for(auto i = 0; i < g_nLen; i++)
pPool->executeInThread(&worker, sText[i], i);
pPool->waitUntilFree();
delete pPool;
std::cout << g_msg.str() << std::endl << "Finished!!" << std::endl;
return 0;
}
Well, I'm done with ideas. If you want to investigate further, I guess gdb or lldb would be able to help you see what's happening.
Anyway thank you for this discussion!
Yes I also have no ideas. The C code produced is quite big: 23982 lines compiled, hmm... And I'm unfamiliar with internals of the compiler.
My plan was to test for a possibility to use nim for some server side tools in our test environment.
I'm really in doubts now.
Hi,
I believe the below code does the same thing as you're trying to achieve, though I think I might be misunderstanding your aim - as it stands, there seems to be no reason to use threads whatsoever?
import unicode, threadpool, locks, os
proc lenU*(s: string): int =
result = s.runeLen
proc charAtPosU*(s: string, pos: int): string =
assert(pos >= 0 and pos < s.runeLen)
result = s.runeAtPos(pos).toUTF8()
var
lk: Lock
proc printCharInString(r: Rune) =
lk.acquire()
defer: lk.release()
stdout.write($r)
proc multithreadedPrint(msg: string, iters: int) =
let
msgLen = msg.lenU
numCalls = msgLen * iters
if msgLen > 0:
if msgLen > MaxDistinguishedThread:
quit("Your string is too long. Maximum allowed is " & $MaxDistinguishedThread & "!")
setMinPoolSize(msgLen)
setMaxPoolSize(msgLen)
initLock(lk)
defer: deinitLock(lk)
echo("Total threads: ", msgLen)
echo("Total calls: ", numCalls)
for i in 0..<iters:
for r in msg.runes():
spawn printCharInString(r)
sync()
multithreadedPrint("0123456789", 2)
The output for a shorter (6 character) string is the same for my version and yours, and mine finishes with larger strings (such as your supplied 10 character one).
In the original code, the worker proc is a closure. And worker does indeed access some of the variables in the surrounding context.
I think that passing a closure to spawn is not allowed. Since all threads are potentially running within a separate memoryspace, I'm not even sure how this kind of sharing of variables could work in Nim. So this seems like a logical limitation.
The C++ version does not use closures, neither does the version of @euant. So these versions should not have that problem.
@euant:
I believe the below code does the same thing as you're trying to achieve, though I think I might be misunderstanding your aim - as it stands, there seems to be no reason to use threads whatsoever?
The idea was: check how easiy (or hard) is to write in nim the app calling predefined amount of threads in order.
For example I want 3 threads and they should be called like: 123123123 etc...
And just to simulate that all works properly each thread prints its own character to console. So if threads are called not in order we will see 122313 or something like. If 2 is not called, output will be 1313 and etc. etc.
@wiffel, @Araq:
Thanks a lot! I really missed that it's impossible to use spawn with closures. :-(
So the sample should look like below one. It's much more verbose, but it works properly.
import unicode, locks, os
proc lenU*(s: string): int =
result = s.runeLen
proc charAtPosU*(s: string, pos: int): string =
assert(pos >= 0 and pos < s.runeLen)
result = s.runeAtPos(pos).toUTF8()
type
Container = ref ContainerObj
ContainerObj = object of RootObj
Flk: Lock
FnLen: int
FnCallsTotal: int
FnCallsCur: int
FnCallIndex: int
WorkerThread = Thread[tuple[cont: Container, c: string, value: int]]
proc newContainer(nLen, nCount: int): Container =
result.new()
result.Flk.initLock()
result.FnLen = nLen
result.FnCallsTotal = nLen*nCount
result.FnCallsCur = 0
result.FnCallIndex = 0
proc isTimeToFinish(self: Container): bool =
result = self.FnCallsCur == self.FnCallsTotal
proc isOurCallIndex(self: Container, nCallIndex: int): bool =
result = self.FnCallIndex == nCallIndex
proc goToNextCallIndex(self: Container) =
inc self.FnCallIndex
self.FnCallIndex = self.FnCallIndex mod self.FnLen
inc self.FnCallsCur
proc destroyContainer(cont: var Container) =
cont.Flk.deinitLock()
cont = nil
proc worker(param: tuple[cont: Container, c: string, value: int]) {.thread.} =
while true:
var bWorkDone = true
param.cont.Flk.acquire()
try:
if param.cont.isTimeToFinish:
return
if param.cont.isOurCallIndex(param.value):
param.cont.goToNextCallIndex()
stdout.write param.c
else:
bWorkDone = false
finally:
param.cont.Flk.release()
if not bWorkDone:
sleep(1)
proc multithreadedPrint(sMsg: string, nCount: int) =
var
cont = newContainer(sMsg.len, nCount)
threads: seq[WorkerThread] = newSeq[WorkerThread](sMsg.len)
for i in 0..<sMsg.len:
threads[i].createThread(worker, (cont, sMsg.charAtPosU(i), i))
joinThreads(threads)
destroyContainer(cont)
multithreadedPrint("01234567890123456789", 3)
echo ""