Hello guys today I finished working in a production module for a startup. It's a pub/sub service that uses WebSockets and Jester for real-time notifications. In order to implement broadcasting messages all over the opened connections the code uses a HashRef[string, HashSet[WebSocket]] type variable (socketPool) to store the opened connections according to its topic and some utility procs (isTopicValid, addSocketInTopic, removeSocketFromTopic) to easily handle the pool operations.
get "/sub/@topic/@authToken":
try:
let topic = @"topic"
let token = @"authToken"
if not request.headers.table.contains("upgrade"):
resp Http426, $ %*{"msg": "http_not_supported"}, appType
if not socketPool.isTopicValid(topic):
resp Http404, $ %*{"msg": "invalid_topic"}, appType
let authProxy = newAuthenticator()
let (status, response) = await authProxy.authenticate(token)
if status != 200:
resp HttpCode(status), response, appType
var ws = await newWebSocket(request)
socketPool.addSocketInTopic(topic, ws)
try:
while ws.readyState == Open:
discard await ws.receiveStrPacket()
except:
discard
finally:
socketPool.removeSocketFromTopic(topic, ws)
resp Http200, $ %*{"msg": "succeed"}, appType
except Exception as e:
echo e.msg
resp Http500, $ %*{"msg": "exception"}, appType
If I run this code it works without problem however when I started to make the stress tests I noticed weird things. If I try to open several ws connections to that endpoint (let's say that more than 500 for example) even if they don't even pass the authenticate validation the route will collapse and will reject next incoming new connections. Even if I free all current connections and the socketPool goes empty, it will reject future connections until I restart the program. When this happens the echo e.msg statement in the except block gives the following error: Exception message: Bad file descriptor. I researched about this and it seems that happens when a TCP socket is replaced by a new one without being previously closed. I decided to try a new route, something like this:
get "/try":
try:
if not request.headers.table.contains("upgrade"):
resp Http426, $ %*{"msg": "http_not_supported"}, appType
var ws = await newWebSocket(request)
try:
while ws.readyState == Open:
discard await ws.receiveStrPacket()
except:
discard
resp Http200, $ %*{"msg": "succeed"}, appType
except Exception as e:
echo e.msg
resp Http500, $ %*{"msg": "exception"}, appType
When I overload this route with several incoming new connections it collapses as well as the previous route. However this route surprisingly recovers from the error and when the new connection rate is lower it can still handle new connections. So all this made me think that the difference between this route and the previous one is the fact that the first one manages operations on a data structure to access data. I know this is a quite complex program but all this made me think this is some sort of race condition when creating several connections in thin periods of time. If you think that's the case a workaround hint would be great or maybe if you have another idea about that's the problem here I'd appreciate it. Thanks!can you try to close the websocket manually?
proc close*(ws: WebSocket) =