tl;dr: I'm issuing lots of small writes to an AsyncSocket, and some of the writes are being dropped — they don't show up on the other end of the TCP connection, and their corresponding send Futures never complete.
I'm using the news package to send lots of small WebSocket messages (on the order of 30,000, mostly under 1KB.) My code used to look similar to this:
while hasMoreData():
let data = nextMessage()
await socket.send(data, Opcode.Binary)
This worked. But the TCP throughput was terrible. I figured the problem is that there's latency to write to the socket, and I'm wasting time awaiting each send before starting the next.
So I tried to do some pipelining. My actual code was more sophisticated, but for simplity's sake let's just say I took out the await so the loop continues before a write completes.
The problem now is that the client on the other end of the socket blows up — turns out a bunch of consecutive WebSocket messages never arrived. Yikes! I can corroborate this by checking which send calls' Futures complete:
var Count = 0
var LastCompletedCount = 0
...
while hasMoreData():
let data = nextMessage()
Count += 1
let count = Count
capture count:
socket.send(data, Opcode.Binary).addCallback proc(f: Future[void]) =
assert not f.failed
if count != LastCompletedCount + 1:
echo &"*** Completed {count} but LastCompleted is {LastCompletedCount}!"
LastCompletedCount = count
That 'echo' call appears after a few hundred messages:
*** Completed 652 but LastCompleted is 494!
and it corresponds to the missing messages on the receiving side. In other words, 156 writes never got acknowledged via their Futures, and never got sent across the TCP connection, despite being written to the WebSocket!
I'm using news, but its send method just calls through to AsyncSocket so I don't think it's the culprit. Is there any known problem with AsyncSocket? (This code is running on macOS 10.14.)
Wow, we now have 3 different websocket implementations. Nice :)
I must say I am suspicious of news here, if I were you I would do the following:
I wasn't sure which WebSocket module to use ... news seems like the newest, since it's based on treeform/ws which is based on niv/websocket ¯_(ツ)_/¯ All three libraries are pretty immature,
I don't see how this can be news's fault, since if you look at the send() function it just formats the frame and calls AsyncSocket.send. (When not using Chronos, Transport is just a type alias of AsyncSocket.) The other two libraries do essentially the same thing.
OK, here's a reproducible test case (still using news). It opens a loopback socket; the client side sends out messages as fast as possible, and the server reads the messages slowly, to create some backpressure. The client watches the order that the send futures complete, to make sure they're consecutive.
import asyncdispatch, asynchttpserver, news, strformat, sugar
proc main() {.async.} =
# Server side:
var server = newAsyncHttpServer()
proc cb(req: Request) {.async.} =
if req.url.path == "/ws":
echo "Server WebSocket connected"
var ws = await newWebsocket(req)
while ws.readyState == Open:
let packet = await ws.receivePacket()
echo "<<< ", packet.data[0..80], "..."
await sleepAsync 100
await req.respond(Http200, "Hello World")
asyncCheck server.serve(Port(9001), cb)
# Client side:
var Count = 0
var LastCompletedCount = 0
let socket = await newWebSocket("ws://localhost:9001/ws")
echo "Client WebSocket connected"
while true:
Count += 1
let count = Count
var message = &"This is WebSocket message #{count}. "
#echo ">>> ", message
for i in 1..3: message = message & message # Make it a bit longer
capture count:
socket.send(message, Opcode.Binary).addCallback proc(f: Future[void]) =
# Callback when the send completes:
assert not f.failed
echo "--- finished sending #", count
if count != LastCompletedCount + 1:
echo &"FATAL ERROR: Completed #{count} but last completed was #{LastCompletedCount}!"
assert count == LastCompletedCount + 1
LastCompletedCount = count
await sleepAsync 1
waitFor main()
When I run this on macOS 10.15.5, it completes 2065 messages, pauses for a few seconds while the reader catches up, then completes 2066-2073, pauses again, and then completes 2074 and then some number around 6000, which triggers the assertion failure:
Server WebSocket connected
Client WebSocket connected
--- finished sending #1
--- finished sending #2
--- finished sending #3
--- finished sending #4
--- finished sending #5
--- finished sending #6
--- finished sending #7
...
...
--- finished sending #2074
--- finished sending #6138
FATAL ERROR: Completed #6138 but last completed was #2074!
The "6138" there is slightly variable, but the one before is always 2074. I suppose if I did the math this would work out to enough bytes to fill the socket's send buffer in the kernel or userspace.
I'll try adapting this to use AsyncSocket directly, then file a bug report.
Here's the plain AsyncSocket version. The message counts are different but the failure is the same.
import asyncdispatch, asyncnet, strformat, sugar
const FrameSize = 999
proc runServer() {.async.} =
# Server side:
var server = newAsyncSocket()
server.bindAddr(Port(9001))
server.listen()
let client = await server.accept()
echo "Server got client connection"
while true:
let frame = await client.recv(FrameSize)
assert frame.len == FrameSize
echo "<<< ", frame[0..80], "..."
await sleepAsync 100
proc main() {.async.} =
asyncCheck runServer()
# Client side:
var Count = 0
var LastCompletedCount = 0
let socket = newAsyncSocket()
await socket.connect("localhost", Port(9001))
echo "Client socket connected"
while true:
Count += 1
let count = Count
var message = &"This is message #{count}. "
#echo ">>> ", message
while message.len < FrameSize:
message = message & message
let frame = message[0..<FrameSize]
capture count:
socket.send(frame).addCallback proc(f: Future[void]) =
# Callback when the send completes:
assert not f.failed
echo "--- finished sending #", count
if count != LastCompletedCount + 1:
echo &"FATAL ERROR: Completed #{count} but last completed was #{LastCompletedCount}!"
assert count == LastCompletedCount + 1
LastCompletedCount = count
await sleepAsync 1
waitFor main()
Filed https://github.com/nim-lang/Nim/issues/15003
Would appreciate it if anyone could run the test case (there's a newer version attached to the Github issue) on non-Mac systems and see if it reproduces. The bug may well be specific to kqueues in which it would only show up on Apple platforms and BSDs.
In case you haven't got feedback from Linux users, this is running the gh test case under Linux (Debian bullseye, kernel 5.7:
[...] SENT #18336 SENT #18337 SENT #18338 SENT #18339 SENT #18340 RCVD #647: 000647 This is message #647 of ∞. Please stay tuned for more. 000647 This is me... ******** ERROR: Server received #647, but last was #420! /home/[...]asytest/asytest.nim(63) asytest /usr/lib/nim/pure/asyncdispatch.nim(1887) waitFor /usr/lib/nim/pure/asyncdispatch.nim(1577) poll /usr/lib/nim/pure/asyncdispatch.nim(1341) runOnce /usr/lib/nim/pure/asyncdispatch.nim(210) processPendingCallbacks /usr/lib/nim/pure/asyncmacro.nim(34) runServerNimAsyncContinue /home/[...]asytest/asytest.nim(25) runServerIter /usr/lib/nim/system/assertions.nim(29) failedAssertImpl /usr/lib/nim/system/assertions.nim(22) raiseAssert /usr/lib/nim/system/fatal.nim(49) sysFatal
Hth …
Same assertion error on Linux.
...
RCVD #269: 000269 This is message #269 of ∞. Please stay tuned for more. 000269 This is me...
RCVD #270: 000270 This is message #270 of ∞. Please stay tuned for more. 000270 This is me...
RCVD #715: 000715 This is message #715 of ∞. Please stay tuned for more. 000715 This is me...
******** ERROR: Server received #715, but last was #270!
toto.nim(63) toto
.choosenim/toolchains/nim-#devel/lib/pure/asyncdispatch.nim(1932) waitFor
.choosenim/toolchains/nim-#devel/lib/pure/asyncdispatch.nim(1614) poll
.choosenim/toolchains/nim-#devel/lib/pure/asyncdispatch.nim(1360) runOnce
.choosenim/toolchains/nim-#devel/lib/pure/asyncdispatch.nim(208) processPendingCallbacks
.choosenim/toolchains/nim-#devel/lib/pure/asyncmacro.nim(23) runServerNimAsyncContinue
toto.nim(25) runServerIter
.choosenim/toolchains/nim-#devel/lib/system/assertions.nim(29) failedAssertImpl
.choosenim/toolchains/nim-#devel/lib/system/assertions.nim(22) raiseAssert
.choosenim/toolchains/nim-#devel/lib/system/fatal.nim(49) sysFatal
[[reraised from:
toto.nim(63) toto
.choosenim/toolchains/nim-#devel/lib/pure/asyncdispatch.nim(1932) waitFor
.choosenim/toolchains/nim-#devel/lib/pure/asyncdispatch.nim(1614) poll
.choosenim/toolchains/nim-#devel/lib/pure/asyncdispatch.nim(1360) runOnce
.choosenim/toolchains/nim-#devel/lib/pure/asyncdispatch.nim(208) processPendingCallbacks
.choosenim/toolchains/nim-#devel/lib/pure/asyncfutures.nim(431) asyncCheckCallback
]]
Error: unhandled exception: toto.nim(25, 16) `n == lastN + 1`
Async traceback:
toto.nim(63) toto
.choosenim/toolchains/nim-#devel/lib/pure/asyncdispatch.nim(1932) waitFor
.choosenim/toolchains/nim-#devel/lib/pure/asyncdispatch.nim(1614) poll
.choosenim/toolchains/nim-#devel/lib/pure/asyncdispatch.nim(1360) runOnce
.choosenim/toolchains/nim-#devel/lib/pure/asyncdispatch.nim(208) processPendingCallbacks
.choosenim/toolchains/nim-#devel/lib/pure/asyncmacro.nim(23) runServerNimAsyncContinue
toto.nim(25) runServerIter
.choosenim/toolchains/nim-#devel/lib/system/assertions.nim(29) failedAssertImpl
.choosenim/toolchains/nim-#devel/lib/system/assertions.nim(22) raiseAssert
.choosenim/toolchains/nim-#devel/lib/system/fatal.nim(49) sysFatal
Exception message: toto.nim(25, 16) `n == lastN + 1`
Exception type: [AssertionDefect]
Error: execution of an external program failed: 'toto '