Still trying to bend SSE my way, but running into a problem. I'll try to describe it by the process flow I envisioned. Full code below (using Prologue server)
Client connects to web server, GET: sse/. sse accepts and sends back headers, among them keep-alive. Then the connection is added to a list. Also added is an asyncstream and a callback is created. Data comes in from an external, or internal source. In this case heartbeat. The data is written to an asyncstream that has a fanout proc as callback. This proc loops the list and writes the data to all the listening clients. But, when see/ is done the client seems to disconnect. The whole process works when sending messages from sse/ directly.
I cannot figure out why it disconnects?
In the code the asyncsocket is added to the list. I tried with the prologue Context and the Context.Request object with the same result.
import prologue, times, strformat, asyncstreams, asyncnet
type
  Clients* = ref seq[(AsyncSocket, FutureStream[(AsyncSocket, string)])]
var sseClientList {.global.}: Clients
new sseClientList
sseClientList[] = @[]
proc sseHeartbeat(fs: FutureStream, interval: float = 30,){.async.} =
  # A server sent event heartbeat is just a commentline without content.
  # fs: FutureStream to write the heartbeat to. (fsInput to fanOut)
  # interval :in seconds, default = 30s.
  while true:
    await sleepAsync(interval*1000)
    if len(sseClientList[]) > 0:
      await fs.write(":\n\n")
proc sseFanOut(fs: FutureStream[string]){.gcsafe.}= #unsafe!!!
  # distributes incoming data over connected clients
  # callback for fsInput asyncstream
  echo "fanout"
  let (hasContent, data) = waitFor fs.read
  if hasContent:
    echo "len client list at fanout: ", len(sseClientList[])
    if len(sseClientList[]) > 0:
      for f in sseClientList[]:
        let (client, fs) = f
        waitFor fs.write((client, data))
#data written to fsInput will be distributed through fanout to clients
var fsInput = newFutureStream[string](fromProc = "main")
fsInput.callback = sseFanOut
proc sseEmitterCb(fs: FutureStream[(AsyncSocket, string)]) =
  echo "emit"
  # emits data to listening client
  let (hasContent, payload) = waitFor fs.read
  if hasContent:
    let(client, data) = payload
    waitFor client.send(data)
    echo "emitted\n"
proc sse(ctx: Context) {.async, gcsafe.} =
  #echo ctx.request.nativeRequest.client[]
  let now = now().utc.format("ddd, d MMM yyyy HH:mm:ss")
  let headers = {
    "Date": &"{now} GMT",
    "Access-Control-Allow-Origin": "*",
    "Cache-Control": "no-cache",
    "Content-type": "text/event-stream; charset=utf-8",
    "Connection": "keep-alive",
    "Content-Length": ""
  }
  await ctx.respond(Http200, "", headers.initResponseHeaders())
  ctx.handled = true
  sseClientList[].add((ctx.request.nativeRequest.client, newFutureStream[(AsyncSocket, string)](fromProc = "main")))
  sseClientList[][len(sseClientList[]) - 1][1].callback = sseEmitterCb
  await fsInput.write("event: connection\ndata: connected\n\n")
  await fsInput.write(":and now, for something completely different...\n\n")
  # send data through fsInput works for these two,
  # then connection dropped by the client? The first heartbeat never finishes.
let
  env = loadPrologueEnv(".env")
  settings = newSettings(
    debug = true,
    address = env.getOrDefault("address", "192.168.1.4"),
    port = Port(env.getOrDefault("port", 8088)),
    reusePort = false
  )
asyncCheck sseHeartbeat(fsInput, 5)
var app = newApp(settings = settings)
app.addRoute("/sse", sse, HttpGet)
app.run()
a simplified version. Two things went wrong. Somehowe it got into my head a callback for the asyncstream is required. It is not and that makes life simpler. No more need to deal with the connections directly, just link the socket_fd to a specific stream. The other one was a waitFor instead of an await, the waitFor kind of blocked.
What does not work is detecting client disconnects. in Python I'd use os.fstat(fd), fstat(fd) is not available, stat() requires a string and req.client.getFd is not a file descriptor. Is there a nim alternative for fstat?
import asynchttpserver, asyncdispatch, asyncnet
import times, strformat, random, os, tables
var sseClients {.global.}: Table[int, FutureStream[string]]
proc sseHeartbeat(fs: FutureStream, interval: float = 30){.async.} =
  # A server sent event heartbeat is just a commentline without hasContent.
  # fs: FutureStream to write the heartbeat to.
  # interval :in seconds, default = 30s.
  while true:
    await sleepAsync(interval*1000)
    if len(sseClients) > 0:
      await fs.write(":\n\n")
proc sseTime(fs: FutureStream, interval: float = 10){.async.} =
  #a dataprovider
  while true:
    let now = now().utc.format("ddd, d MMM yyyy HH:mm:ss")
    if len(sseClients) > 0:
      await fs.write("event: time\ndata: {now}\n\n".fmt)
    await sleepAsync(interval*1000)
proc sseFanOut(fs: FutureStream[string]){.gcsafe.}= #unsafe!
  # distributes incoming data over clients
  # callback for fsInput asyncstream
  let (hasContent, data) = waitFor fs.read
  if hasContent:
    if len(sseClients) > 0:
      for k in sseClients.keys:
        waitFor sseClients[k].write(data)
proc sse(req: Request) {.async, gcsafe.} =
    let now = now().utc.format("ddd, d MMM yyyy HH:mm:ss")
    let headers = {
      "Date": &"{now} GMT",
      "Content-type": "text/plain; charset=utf-8",
      "Access-Control-Allow-Origin": "*",
      "Cache-Control": "no-cache",
      "Content-type": "text/event-stream; charset=utf-8",
      "Connection": "keep-alive",
      "Content-Length": ""
    }
    await req.respond(Http200, "", headers.newHttpHeaders())
    let fd = int(req.client.getFd)
    echo fd
    sseClients[fd] = newFutureStream[string](fromProc = "main")
    var data = "event: connect\ndata: connected\n\n".fmt
    await req.client.send(data)
    while true:
      let (hasContent, data) = await sseClients[fd].read  #not waitFor!
      if hasContent:
        await req.client.send(data)
      echo "."
proc main {.async.} =
  #data written to fsInput will be distributed through fanout to clients
  var fsInput = newFutureStream[string](fromProc = "main")
  fsInput.callback = sseFanOut
  asyncCheck sseHeartbeat(fsInput, 5)
  asyncCheck sseTime(fsInput, 1)
  
  var server = newAsyncHttpServer()
  server.listen(Port(8088), "192.168.1.4")
  echo "192.168.1.4:8088"
  while true:
    if server.shouldAcceptRequest():
      await server.acceptRequest(sse)
    else:
      poll()
asyncCheck main()
runForever()