Still trying to bend SSE my way, but running into a problem. I'll try to describe it by the process flow I envisioned. Full code below (using Prologue server)
Client connects to web server, GET: sse/. sse accepts and sends back headers, among them keep-alive. Then the connection is added to a list. Also added is an asyncstream and a callback is created. Data comes in from an external, or internal source. In this case heartbeat. The data is written to an asyncstream that has a fanout proc as callback. This proc loops the list and writes the data to all the listening clients. But, when see/ is done the client seems to disconnect. The whole process works when sending messages from sse/ directly.
I cannot figure out why it disconnects?
In the code the asyncsocket is added to the list. I tried with the prologue Context and the Context.Request object with the same result.
import prologue, times, strformat, asyncstreams, asyncnet
type
Clients* = ref seq[(AsyncSocket, FutureStream[(AsyncSocket, string)])]
var sseClientList {.global.}: Clients
new sseClientList
sseClientList[] = @[]
proc sseHeartbeat(fs: FutureStream, interval: float = 30,){.async.} =
# A server sent event heartbeat is just a commentline without content.
# fs: FutureStream to write the heartbeat to. (fsInput to fanOut)
# interval :in seconds, default = 30s.
while true:
await sleepAsync(interval*1000)
if len(sseClientList[]) > 0:
await fs.write(":\n\n")
proc sseFanOut(fs: FutureStream[string]){.gcsafe.}= #unsafe!!!
# distributes incoming data over connected clients
# callback for fsInput asyncstream
echo "fanout"
let (hasContent, data) = waitFor fs.read
if hasContent:
echo "len client list at fanout: ", len(sseClientList[])
if len(sseClientList[]) > 0:
for f in sseClientList[]:
let (client, fs) = f
waitFor fs.write((client, data))
#data written to fsInput will be distributed through fanout to clients
var fsInput = newFutureStream[string](fromProc = "main")
fsInput.callback = sseFanOut
proc sseEmitterCb(fs: FutureStream[(AsyncSocket, string)]) =
echo "emit"
# emits data to listening client
let (hasContent, payload) = waitFor fs.read
if hasContent:
let(client, data) = payload
waitFor client.send(data)
echo "emitted\n"
proc sse(ctx: Context) {.async, gcsafe.} =
#echo ctx.request.nativeRequest.client[]
let now = now().utc.format("ddd, d MMM yyyy HH:mm:ss")
let headers = {
"Date": &"{now} GMT",
"Access-Control-Allow-Origin": "*",
"Cache-Control": "no-cache",
"Content-type": "text/event-stream; charset=utf-8",
"Connection": "keep-alive",
"Content-Length": ""
}
await ctx.respond(Http200, "", headers.initResponseHeaders())
ctx.handled = true
sseClientList[].add((ctx.request.nativeRequest.client, newFutureStream[(AsyncSocket, string)](fromProc = "main")))
sseClientList[][len(sseClientList[]) - 1][1].callback = sseEmitterCb
await fsInput.write("event: connection\ndata: connected\n\n")
await fsInput.write(":and now, for something completely different...\n\n")
# send data through fsInput works for these two,
# then connection dropped by the client? The first heartbeat never finishes.
let
env = loadPrologueEnv(".env")
settings = newSettings(
debug = true,
address = env.getOrDefault("address", "192.168.1.4"),
port = Port(env.getOrDefault("port", 8088)),
reusePort = false
)
asyncCheck sseHeartbeat(fsInput, 5)
var app = newApp(settings = settings)
app.addRoute("/sse", sse, HttpGet)
app.run()
a simplified version. Two things went wrong. Somehowe it got into my head a callback for the asyncstream is required. It is not and that makes life simpler. No more need to deal with the connections directly, just link the socket_fd to a specific stream. The other one was a waitFor instead of an await, the waitFor kind of blocked.
What does not work is detecting client disconnects. in Python I'd use os.fstat(fd), fstat(fd) is not available, stat() requires a string and req.client.getFd is not a file descriptor. Is there a nim alternative for fstat?
import asynchttpserver, asyncdispatch, asyncnet
import times, strformat, random, os, tables
var sseClients {.global.}: Table[int, FutureStream[string]]
proc sseHeartbeat(fs: FutureStream, interval: float = 30){.async.} =
# A server sent event heartbeat is just a commentline without hasContent.
# fs: FutureStream to write the heartbeat to.
# interval :in seconds, default = 30s.
while true:
await sleepAsync(interval*1000)
if len(sseClients) > 0:
await fs.write(":\n\n")
proc sseTime(fs: FutureStream, interval: float = 10){.async.} =
#a dataprovider
while true:
let now = now().utc.format("ddd, d MMM yyyy HH:mm:ss")
if len(sseClients) > 0:
await fs.write("event: time\ndata: {now}\n\n".fmt)
await sleepAsync(interval*1000)
proc sseFanOut(fs: FutureStream[string]){.gcsafe.}= #unsafe!
# distributes incoming data over clients
# callback for fsInput asyncstream
let (hasContent, data) = waitFor fs.read
if hasContent:
if len(sseClients) > 0:
for k in sseClients.keys:
waitFor sseClients[k].write(data)
proc sse(req: Request) {.async, gcsafe.} =
let now = now().utc.format("ddd, d MMM yyyy HH:mm:ss")
let headers = {
"Date": &"{now} GMT",
"Content-type": "text/plain; charset=utf-8",
"Access-Control-Allow-Origin": "*",
"Cache-Control": "no-cache",
"Content-type": "text/event-stream; charset=utf-8",
"Connection": "keep-alive",
"Content-Length": ""
}
await req.respond(Http200, "", headers.newHttpHeaders())
let fd = int(req.client.getFd)
echo fd
sseClients[fd] = newFutureStream[string](fromProc = "main")
var data = "event: connect\ndata: connected\n\n".fmt
await req.client.send(data)
while true:
let (hasContent, data) = await sseClients[fd].read #not waitFor!
if hasContent:
await req.client.send(data)
echo "."
proc main {.async.} =
#data written to fsInput will be distributed through fanout to clients
var fsInput = newFutureStream[string](fromProc = "main")
fsInput.callback = sseFanOut
asyncCheck sseHeartbeat(fsInput, 5)
asyncCheck sseTime(fsInput, 1)
var server = newAsyncHttpServer()
server.listen(Port(8088), "192.168.1.4")
echo "192.168.1.4:8088"
while true:
if server.shouldAcceptRequest():
await server.acceptRequest(sse)
else:
poll()
asyncCheck main()
runForever()