My naive start failed miserably in all sort of ways. A nudge in the right direction is welcome. Also a tutorial on the whole future async await thing with as little lingo as possible would be welcome. I've not found one yet.
import asyncdispatch, asyncstreams, asyncfutures
proc cbEcho(fs:FutureStream): Future[void] {.async.}=
if not fs.finished:
echo(fs.read.read)
proc streamWriter(fs:FutureStream): Future[void] {.async.}=
for i in 0..9:
fs.write(i)
await sleepAsync(1000)
fs.complete
var nFS = newFutureStream[int](fromProc = "cbEcho")
nFS.callback = (proc() = cbEcho(nFS)) #?????
wait_for streamWriter(nFS)
TIA
I am not an async expert in the slightest, but at least this code compile and run:
Here is a tutorial on async programming in Nim, written for people who've never done async programming before: https://peterme.net/asynchronous-programming-in-nim.html
As for you specific code I have fixed it up here. It was a combination of minor errors, so I've added comments with the things I've changed (apart from some simple style fixes):
import asyncdispatch, asyncstreams, asyncfutures
proc cbEcho(fs: FutureStream[int]) = # This callback can't be async, and must have the right generic argument to FutureStream
# Don't need to check for finished, the returned tuple includes a boolean saying if it is valid or not
let data = waitFor fs.read # waitFor is safe here since we know that fs has data
if data[0]: # The tuple doesn't have named fields for some reason
echo data[1]
proc streamWriter(fs: FutureStream) {.async.} = # Doesn't need the Future[void] return type
for i in 0..9:
await fs.write(i) # Write is an async procedure and must be awaited
await sleepAsync(1000)
fs.complete
var nFS = newFutureStream[int](fromProc = "main") # This is from the main proc, not from cbEcho. However this is only used for error messages
nFS.callback = cbEcho
waitFor streamWriter(nFS)
I would do it this way:
import asyncdispatch, asyncstreams, asyncfutures
proc streamWriter(fs: FutureStream) {.async.} =
for i in 0..9:
await fs.write(i)
await sleepAsync(100)
fs.complete
proc main() {.async.} =
var nFS = newFutureStream[int](fromProc = "main")
asyncCheck streamWriter(nFS)
while true:
let (hasData, data) = await nFS.read()
if not hasData: break
echo(data)
waitFor main()
Two follow up questions:
in the @PMunch blog/tut, the first example starts the run with waitFor tickerFuture and delayedEchoFuture. In the second it is started with waitFor sleepAsync(2500). That last felt like 'magic'. How/why does it work? Or is it 'as long as the dispacher is progressed'
The comment: proc cbEcho(fs: FutureStream[int]) = # This callback can't be async, and must have the right generic argument to FutureStream
Does it mean the callback can never be async?
What if I would want something like this not working sketch:
import asyncdispatch, asyncstreams, asyncfutures
var fsList: seq[FutureStream[int]] #clients
proc cbEcho(fs: FutureStream[int]) =
# emits data to listening client
let data = waitFor fs.read
if data[0]:
echo data[1]
proc registerFS(fsList: var seq[FutureStream[int]], cb: proc) =
fsList.add(newFutureStream[int](fromProc = "main"))
fsList[len(fsList) - 1].callback = cb
proc fanOut(fs: FutureStream[int])=
# distributes incomming data over clients
let data = waitFor fs.read
if data[0]:
for f in fsList:
waitFor f.write(data[1])
#await sleepAsync(10)
var fsInput = newFutureStream[int](fromProc = "main")
fsInput.callback = fanOut
for i in 1..5: # 5 clients listening
registerFS(fsList, cbEcho)
for i in 0..9: #data from one input
fsInput.write(1)
waitFor fanOut()