I've seen the thread of 1 year ago about "Data loss with async sockets", but I decided to start a newer one. I see that stdlib's async implementation yet completely lacks of cancellation points in futures, and this code works completely wrong and unintuitive:
import asyncdispatch
proc test() {.async.} =
await sleepAsync(1000)
echo "test"
proc run() {.async.} =
let fut = test()
fut.fail(newException(Exception, "fail"))
await sleepAsync(2000)
waitFor(run())
Expected behaviour: The test async proc will be cancelled while awaiting for sleepAsync and then calls all callbacks to it with failure.
Current behaviour: The test async proc continues to run, completes after return and gets double completed.
I think that it is not normal and not very hard to be fixed. I've trying to write an RabbitMQ driver in nim and I am not sure that it is good idea to write all those if completed: break or something everywhere in async callbacks which are needed to be cancelled some times. Is there any workarounds of this problem (I know about chronos, but I really not want to switch to a not official eventloop in my lib)?
The current stdlib implementation does not support arbitrary cancellation of futures. What you can do to cancel an operation is close the FD (socket) that it is working on.
This might work for your use case, but if not then I’d be interested to hear about it so we can support it.
Let me rewrite a bit the upper code:
import asyncdispatch
var connectionAlive: bool = true
proc test() {.async.} =
var x = 0
while connectionAlive:
# send heartbeat info
await sleepAsync(1000) # await for heartbeat timeout
# check if we got any heartbeat reply from server or close the connection with error
echo "test loop: " & $x
proc run() {.async.} =
let fut = test()
await sleepAsync(2100)
# if i get a ConnectionClose command somewhere here
fut.fail(newException(Exception, "fail"))
await sleepAsync(500)
waitFor(run())
This is a sample situation when for some reason (e.x. on ConnectionClose command) I need to cancel any pending awaiting operations. In this case it is a heartbeat coroutine. Using the std async functionality I can't do anything right now except embedding new variable to while loop and break the loop on next iteration, but this might be a problem if there are any wait lock in the loop and there's no possibility to predict the next iteration of loop.
If your goal is just to have a timeout future, I have an example that wrap the request proc for asyncHttpClient which do not support pooling and timeout yet.
proc request*(
pool: HttpClientPool,
url: string,
httpMethod: string,
body = "",
headers: HttpHeaders = nil,
multipart: MultipartData = nil,
timeout = 5000,
): Future[AsyncResponse] {.async.} =
let client = await pool.clients.dequeue()
defer: pool.clients.enqueue(client)
let fut = newFuture[AsyncResponse]("request") #-------------------------------------- 1
proc cb1(fut1: Future[AsyncResponse]) =
if not fut.finished: # ------------------------------------------------------------ 4
if fut1.failed: fut.fail(fut1.readError())
else: fut.complete(fut1.read())
proc cb2(fut2: Future[void]) =
if not fut.finished: # ------------------------------------------------------------ 4
client.close() # ---------------------------------------------------------------- 5
fut.fail newException(RequestTimeoutError, fmt"timeout={timeout}ms")
let fut1 = client.request(url, httpMethod, body, headers, multipart) # -------------- 2
let fut2 = sleepAsync(timeout) # ---------------------------------------------------- 3
fut1.addCallback cb1
fut2.addCallback cb2
return await fut # ------------------------------------------------------------------ 6
This probably not the most resource efficient ways, but it currently works for me. Love to hear any ways to improve.
Besides, I want to have a rabbitmq client recently too~
chonos supports cancellation, though in a slightly different way - you don't complete the future with fail - you cancel it - using fail like that might be hard to reason about because you now have multiple "owners" fulfilling the promise.
import chronos
proc test() {.async.} =
await sleepAsync(1000)
echo "test"
proc run() {.async.} =
let fut = test()
fut.cancel()
await sleepAsync(2000)
waitFor(run())
Modifying your 2nd example
import std/[asyncdispatch, sugar]
var connectionAlive: bool = true
type Cancellation = proc(): bool
proc test(c: Cancellation) {.async.} =
var x = 0
while connectionAlive:
await sleepAsync(1000) # await for heartbeat timeout
let cancelled = c()
if cancelled:
echo "received Close command, stop"
break
# check if we got any heartbeat reply from server or close the connection with error
echo "test loop: " & $x
proc run() {.async.} =
var cancelled = false
let fut = test(() => cancelled)
await sleepAsync(1500)
# if i get a ConnectionClose command somewhere here
#fut.fail(newException(Exception, "fail"))
cancelled = true
await sleepAsync(500)
waitFor(run())
The scenario is
If we change the #4 to noop (by commenting the line cancelled = true), the test will be still running in the background and it will print the fmt"test loop {x} twice.
The thing we can do nothing about, when awaiting you cannot evaluate its cancellation state. Anything that already sent (by IO network or written or read or anything regarded IO) is already done, no cancelling, so if the test is awaiting 15 minutes but the connection close command received early, the test will still waiting for 15 minutes.
If by chances that many things can happen in the span of 15 minutes, maybe maybe the cancellation state is cancelled/resumed several time (that is cancelleception) from many places, this design is inviting heisenbug and will hard to reason.
Btw, I never tested something like this so could you test for your use case? Kinda want to see whether this is actually working or not :P