For some time now I've been trying to get a little message bus running using FutureStreams. It works when the messages are strings. Full code at https://gist.github.com/ingoogni/b29fde37f74216e57abf7ed7812d2a52
Now I would like to get it working with different message types/kinds. The following works in one direction.
type
MsgKind = enum
mkString,
mkInt,
mkFloat
Message* = object
topic*: string
id*: string
msgtype*: string #"p/s", "req/rep", "reply". req/rep requires id
case msg*: MsgKind
of mkString: strVal: string
of mkInt: intVal: int
of mkFloat: floatVal: float
[...]
proc post*(topic:string, msg:string|int|float, id:string = "", msgtype:string = "p/s"){.async.}=
var message = Message(
topic: topic,
msg: cast[MsgKind](msg),
id: id,
msgType: msgType
)
echo type(message.msg) #---> this only tells me MsgKind, not what kind it originally was.
await bus.fsBus.write(message)
But how do I go the other way, from MsgKind back to for example int?
(I also tried it with the big T, but failed. There it is the var bus* that gives me problems)
Message.msg is the enum MsgKind, but msg for the proc is string, int or float. Those types aren't compatible.
You want something like this
type
MsgKind = enum
mkString,
mkInt,
mkFloat
Message* = object
topic*: string
id*: string
msgtype*: string #"p/s", "req/rep", "reply". req/rep requires id
case msg*: MsgKind
of mkString: strVal: string
of mkInt: intVal: int
of mkFloat: floatVal: float
proc post*(topic:string, msg:string|int|float, id:string = "", msgtype:string = "p/s") =
var message =
when msg is string:
Message(
topic: topic,
msg: mkString,
strVar: msg,
id: id,
msgType: msgType
)
elif msg is int:
Message(
topic: topic,
msg: mkInt,
intVal: msg,
id: id,
msgType: msgType
)
elif msg is float:
Message(
topic: topic,
msg: mkString,
floatVal: msg,
id: id,
msgType: msgType
)
Do you need to know the different msg types and do something with that knowledge? If not, You can do something like this
type
Message*[T] = object
topic*: string
id*: string
msgtype*: string #"p/s", "req/rep", "reply". req/rep requires id
msg*: T
proc post*[T](topic:string, msg: T, id:string = "", msgtype:string = "p/s") =
var message = Message[T](
topic: topic,
msg: msg,
id: id,
msgType: msgType
)
If you want to have better control over when the id is there, you can do something like this
MsgKinds = string | int | float
MsgType = enum
mtPS,
mtRR,
mtR
Message*[T: MsgKinds] = object
topic*: string
msg*: T
case msgtype*: MsgType #"p/s", "req/rep", "reply". req/rep requires id
of mtRR:
id: string
else:
discard
proc post*[T: MsgKinds](topic:string, msg: T, id = "", msgtype = mtPS) =
var message =
case msgType
of mtRR:
Message[T](
topic: topic,
msg: msg,
id: id,
msgType: msgType
)
else:
Message[T](
topic: topic,
msg: msg,
msgType: msgType
)
Thank you so far, something to study. Coming back to bigT, there is a bit more:
type
Message*[T] = object
topic*: string
id*: string
msgtype*: string #"p/s", "req/rep", "reply". req/rep requires id
msg*: T
Subscriber*[T] = object
fs*: FutureStream[Message[T]]
subscriptions*: seq[string] #topic strings
topics*: seq[string] #topic strings
source*: string #name of proc
id*: string
Bus*[T] = object
subscribers*: Table[string, Subscriber[T]] #table[subscriber id, subscriber object]
topicSubscribers*: Table[string, seq[string]] #table[topic string, seq[subscriber id's]]
fsBus*: FutureStream[Message[T]]
#bus is the input stream for the broker.
#all messages to be distributed should be written to this stream.
var bus* = Bus(
fsBus: newFutureStream[Message[T]](fromProc = "main")
)
this gives an error:
_02.nim(27, 15) Error: object constructor needs an object type [genericBody declared [...]\psbus_02.nim(19, 3)]
case msgtype*: MsgType #"p/s", "req/rep", "reply". req/rep requires id
Ooh, I'm sorry. From your code it shows my choice of variables is not the most elegant. msgtype has no relation to the type of msg. (msgtype controls how the broker works.)
Unless you want a separate bus per msg type, you should go back to the variant form of Message.
Thanks for putting the finger on my worries. In retrospect I see what @ynfle meant.
Thanks. (old brain's gettin' slower)
Sadly with the case there seem to be issues:
A proc that posts a message:
proc tenSeconds(){.async.} =
## posts time every ten seconds
while true:
await post("time", now().utc.format("ddd, d MMM yyyy HH:mm:ss"))
await sleepAsync(1000) #*10
it uses the post proc as in @ynfle 's first post. On the receiving end:
proc echoTime(){.async.} =
## receives all messages with a topic string that starts with time
let sub = await subscribe("echoTime", @["time"])
while true:
let (hasContent, msg) = await sub.fs.read
if hasContent:
echo msg[]
#echo msg.strVal
else:
poll()
This results in:
(topic: "time", id: "", msgtype: "p/s", msg: mkString, strVal: "Wed, 18 May 2022 11:57:55")
But when I uncomment the echo msg.strVal I get the error:
Error: undeclared field: 'strVal' for type psbus_01.Message [type declared in [...]\psbus.nim(12, 3)]
It seems as something is lost in the broker / futurestream.
Another way out could be using JSON, Protocol Buffers et al? Seem a bit much.
It is kind of hard to know what is failing without a complete example. I pieced together this from the various snippets above and it works fine:
import times, asyncstreams, asyncdispatch
type
MsgKind = enum
mkString,
mkInt,
mkFloat
Message* = object
topic*: string
id*: string
msgtype*: string #"p/s", "req/rep", "reply". req/rep requires id
case msg*: MsgKind
of mkString: strVal: string
of mkInt: intVal: int
of mkFloat: floatVal: float
Subscriber* = object
fs*: FutureStream[Message]
subscriptions*: seq[string] #topic strings
topics*: seq[string] #topic strings
source*: string #name of proc
id*: string
Bus* = object
fsBus*: FutureStream[Message]
var bus = Bus(fsBus: newFutureStream[Message](fromProc = "main"))
proc post*(topic:string, msg:string|int|float, id:string = "", msgtype:string = "p/s") {.async.} =
var message =
when msg is string:
Message(
topic: topic,
msg: mkString,
strVal: msg,
id: id,
msgType: msgType
)
elif msg is int:
Message(
topic: topic,
msg: mkInt,
intVal: msg,
id: id,
msgType: msgType
)
elif msg is float:
Message(
topic: topic,
msg: mkString,
floatVal: msg,
id: id,
msgType: msgType
)
await bus.fsBus.write(message)
proc tenSeconds(){.async.} =
## posts time every ten seconds
while true:
await post("time", now().utc.format("ddd, d MMM yyyy HH:mm:ss"))
await sleepAsync(1000*10)
proc echoTime() {.async.} =
while true:
let (hasContent, msg) = await bus.fsBus.read
if hasContent:
echo msg
echo msg.strVal
else:
await sleepAsync(0)
asyncCheck tenSeconds()
waitFor echoTime()
You appear to have built some kind of subscriber system though, but without any details as to how it works it's kind of hard to copy it.
You appear to have built some kind of subscriber system though,
Yes. Working code for just strings is at:
https://gist.github.com/ingoogni/b29fde37f74216e57abf7ed7812d2a52
No idea of the rest of this is sane though as I didn't really look at the logic of it much
Export the strVal, intVal, and floatVal fields
Aha! (would always export everything be a breaking change for V2?)
Thank you so much for your time.