Hi there,
I’m trying to start many processes and asynchronously wait for their completion, using the asynctools package. This works for running one process, but crashes when a second process is started before the first ends. Minimal example:
import asyncdispatch, asynctools/asyncproc
proc run() {.async.} =
while true:
await sleepAsync(300)
var p = startProcess("sleep 1 && ls", options={poUsePath, poEvalCommand})
asyncCheck p.waitForExit()
waitFor run()
Crashes with the error:
Error: unhandled exception: Resource temporarily unavailable (code: 11) [IOSelectorsException]
Does anyone have experience doing this with asynctools, or some other way?
also crashes for me. It seems that waitForExit is buggy. Maybe open a bug report in asynctools.
I've done this as well in the past, but it is a SUPER HACKY solution (but it's still working for me, in a non mission critical application).
proc callAsync*(cmd: string, arguments: seq[string]): Future[int] {.async.} =
var pr = startProcess(cmd.quoteShell, args = arguments)
while true:
result = pr.peekExitCode
if result != -1:
break
await sleepAsync(250)
# ....
if (await callAsync( convertCommand, args )) == 0:
return thumbPath
else:
return
Yeah, I think this is a bug in asynctools. I recall running into similar things, I created an issue for this a while ago: https://github.com/cheatfate/asynctools/issues/9.
If you have some time it would be brilliant if you could look into the cause and fix it :)
I don't think it's a bug in asynctools but either in bug in asyncdispatch.addProcess or an OS limitation:
In my quest to replace GNU parallel that was extremely annoying in CI, I wrote a parallel shell command runner and needed to not start more processes than CPU cores. And automatically schedule the next once one finished, I used addProcess callback: https://github.com/mratsim/constantine/blob/df04811/helpers/pararun.nim#L73-L82
proc enqueuePendingCommands(wq: WorkQueue) {.async.} =
while wq.cmdQueue.len > 0:
await wq.sem.acquire() # Semaphore: await until a free CPU core is available
let cmd = wq.cmdQueue.popFirst()
let p = cmd.startProcess(
options = {poStdErrToStdOut, poUsePath, poEvalCommand}
)
p.processID.addProcess do (fd: AsyncFD) -> bool:
wq.sem.release() # Callback: release a semaphore slot on process exit
wq.outputQueue.putNoWait((cmd, p))
Afterwards, any interaction with the process, like reading its stdout would trigger Error: unhandled exception: Resource temporarily unavailable (code: 11) [IOSelectorsException]
If you look into waitForExit implementation (Windows and POSIX, they require addProcess as well
proc waitForExit*(p: AsyncProcess): Future[int]
## Waits for the process to finish in asynchronous way and returns
## exit code.
when defined(windows):
when declared(addProcess):
proc waitForExit(p: AsyncProcess): Future[int] =
var retFuture = newFuture[int]("asyncproc.waitForExit")
proc cb(fd: AsyncFD): bool =
var value = 0'i32
let res = getExitCodeProcess(p.fProcessHandle, value)
if res == 0:
retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
else:
p.isExit = true
p.exitCode = value
retFuture.complete(p.exitCode)
if p.isExit:
retFuture.complete(p.exitCode)
else:
addProcess(p.procId, cb)
return retFuture
else:
when declared(addProcess):
proc waitForExit*(p: AsyncProcess): Future[int] =
var retFuture = newFuture[int]("asyncproc.waitForExit")
proc cb(fd: AsyncFD): bool =
var status = cint(0)
let res = posix.waitpid(p.procId, status, WNOHANG)
if res <= 0:
retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
else:
p.isExit = true
p.exitCode = statusToExitCode(status)
retFuture.complete(p.exitCode)
if p.isExit:
retFuture.complete(p.exitCode)
else:
while true:
var status = cint(0)
let res = posix.waitpid(p.procId, status, WNOHANG)
if res < 0:
retFuture.fail(newException(OSError, osErrorMsg(osLastError())))
break
elif res > 0:
p.isExit = true
p.exitCode = statusToExitCode(status)
retFuture.complete(p.exitCode)
break
else:
try:
addProcess(p.procId, cb)
break
except:
let err = osLastError()
if cint(err) == ESRCH:
continue
else:
retFuture.fail(newException(OSError, osErrorMsg(err)))
break
return retFuture
I tried to hack something light with the same purpose a few month ago and just had to give up async. A few thing don't work on Windows, some things have been know to be just buggy on Linux too for a long time.
Just used threads in the end, but stumbled upon threadpool being buggy on ARC/ORC, broken blockUntilAny and so on along the way.