When I start small, passing pointer of channel seems working fine. But when I grouping logics into proc, I got segmentation fault, illegal storage access or channel only work on one direction, not the other. I cannot reason about the behavior, even after consulting the manual... so I search for help here.
In short, I want to ask
nim version is 0.20.2 on ubunutu
import os, asyncdispatch
type
WorkerChannel*[R,T] = object
rx: ptr Channel[R]
tx: ptr Channel[T]
Worker*[R,T] = proc(args: WorkerChannel[R,T]) {.thread,nimcall.}
proc newWorkerChannel[R,T]( maxTxItems: int = 0, maxRxItems: int = 0): WorkerChannel[R,T] =
var tx: Channel[T]
var rx: Channel[R]
tx.open(maxTxItems)
rx.open(maxRxItems)
result.tx = tx.addr
result.rx = rx.addr
proc twist*[R,T](ch: WorkerChannel[R,T]) : WorkerChannel[T,R] =
WorkerChannel[T,R](tx: ch.rx, rx: ch.tx)
proc send*[R,T](ch: WorkerChannel[R,T], x: T, retyInterval: int = 100): Future[void] {.async.} =
while not ch.tx[].trySend x:
await sleepAsync(retyInterval)
proc sendSync*[R,T](ch: WorkerChannel[R,T], x: T) = ch.tx[].send x
proc recv*[R,T](ch: WorkerChannel[R,T], maxDelay: int = 100): Future[R] {.async.} =
while true:
let (avail, msg)= ch.rx[].tryRecv
if avail: return msg
await sleepAsync(maxDelay)
proc recvSync*[R,T](ch: WorkerChannel[R,T]): R = ch.rx[].recv
proc newWorker*[R,T](taskFunc: Worker[R,T], maxTxItems: int = 0, maxRxItems: int = 0): WorkerChannel[T,R] =
## Worker recieve txType and send rxType
var th : Thread[WorkerChannel[R, T]]
var ch = newWorkerChannel[R, T](maxTxItems=maxTxItems, maxRxItems=maxRxItems)
createThread(th, taskFunc, ch)
ch.twist()
# ---------------------------------------------------------
# sync
proc taskSync(ch: WorkerChannel[int,string]) {.thread.} =
# echo ch.recvSync
for i in 1 .. 10:
ch.sendSync $i
proc worker() =
# var ch = newWorker(taskSync)
var th : Thread[WorkerChannel[int, string]]
var chan = newWorkerChannel[int, string]()
createThread(th, taskSync, chan)
var ch = chan.twist()
## this line lead to Illegal storage access
# ch.sendSync 10
while true:
echo ch.recvSync
worker()
# ---------------------------------------------------------
# async
proc taskAsync(ch: WorkerChannel[int,string]) {.async.} =
var i = 0
while true:
echo "sending ", i, " on thread=", getThreadId()
await ch.send($i)
inc i
await sleepAsync(500)
proc task(ch: WorkerChannel[int,string]) {.thread.} =
waitFor taskAsync(ch)
proc main() {.async.} =
# var chan = newAsyncWorker[int, string](task)
var chan = newWorker(task)
while true:
var i = await chan.recv
echo "recevied ", i
await sleepAsync(100)
## If I try to run task on main thread, this works.
# waitFor taskAsync(newWorkerChannel[int, string]()) # work
# task(newWorkerChannel[int, string]()) # work
## If I run on threads, some time I got Illegal storage access, some time I got exit after two recv
# asyncCheck main()
# try: runForever()
# except: discard