In my search for the use of globals in a multithreaded environment we have (I and AI) developed this approach to storing globals in a thread (or actually 1 thread per object). Maybe usefull for people attached to there globals.
You can create a separate module for it and publicize the needed items. It is the result of test-project; maybe I will make a separate repo for it later. I dont know if it is usefull convert it to a nimble-library (i have no experience with that yet)..
import std/[tables, random, sequtils, os, options, times]
type
# object definition
Page = object
url, config, lastfetched: string
timedout: bool
Person = object
name: string
level: int
type
# global command definition - enumaration
CommandKind = enum
Add, Read, Update, Delete, GetAll, GetIDs, Stop
# global commands to use the statekeeper-thread as a database of objects
# using reply-channels; channels used locally in the procs to provide a unique destination.
# (sent to ref of the calling proc-var)
Command[T] = object
case kind: CommandKind
of Add:
dataob: T
reply_ID_chob: ref Channel[string]
of Read:
rd_idst: string
rd_reply_chob: ref Channel[(string, T)]
of Update:
upd_idst: string
upd_dataob: T
reply_msg_chob: ref Channel[string]
of Delete:
del_idst: string
del_reply_msg_chob: ref Channel[string]
of GetAll:
reply_table_chob: ref Channel[Table[string, T]]
of GetIDs:
reply_allids_chob: ref Channel[seq[string]]
of Stop:
discard
# for each object-type (like page) a store-object wil be generated
type
Store[T] = ref object
ch: ref Channel[Command[T]]
th: Thread[ref Channel[Command[T]]]
# for id-gen
randomize()
proc genRandIdStr(len: int): string =
# generate and return a random-string based on a string of letters (extra vowels for better readability).
# usage: let idst = genRandIdStr(8)
const chars = "aaabcdeeefghiiijklmnooopqrstuuuvwxyz"
for _ in 0..<len:
result.add chars[rand(chars.len - 1)]
proc statekeeper[T](ch: ref Channel[Command[T]]) {.thread.} =
#[
This proc functions as data-store to provide a replacement for global state. Using generics it can use any object. In the example a table of webpages is maitained. Than it provides for adding a page to table, reading a page from table, updating and deleting a page. (aot)
For each object-type a thread is created.
This data-store should be used for slow-changing data; for quickly changing data use local vars.
Table consists of form: idstring and a matching page.
Reply-channels must be dereffed. (see later readPage-proc)
The proc awaits commands thru the command-channel from sending procs or threads.
]#
var
storeta: Table[string, T]
messagest: string
while true:
let cmdob = ch[].recv()
case cmdob.kind
of Add:
# add object sent from the cmd-channel to the table with a newly genned ID; reply the ID
let idst = genRandIdStr(10)
storeta[idst] = cmdob.dataob
cmdob.reply_ID_chob[].send(idst)
of Read:
# send a message and page to the replychannel based on the ID received thru the cmd-channel
if cmdob.rd_idst in storeta:
messagest = "Success in reading ID: " & cmdob.rd_idst
cmdob.rd_reply_chob[].send((messagest, storeta[cmdob.rd_idst]))
else:
messagest = "Failure in reading. Could not find ID: " & cmdob.rd_idst & " - Returning empty record: "
cmdob.rd_reply_chob[].send((messagest, default(T)))
of Update:
# replace an object with the received ID with the received updated object and
# send a result-message to the replychannel
if cmdob.upd_idst in storeta:
storeta[cmdob.upd_idst] = cmdob.upd_dataob
cmdob.reply_msg_chob[].send("Success in updating record " & cmdob.upd_idst)
else:
cmdob.reply_msg_chob[].send("Failure in updating record. Could not find ID-string!")
of Delete:
# remove an object from the table with the received ID from the cmd-channel, and
# send a result-message.
if cmdob.del_idst in storeta:
storeta.del(cmdob.del_idst)
cmdob.del_reply_msg_chob[].send("Success in deleting record " & cmdob.del_idst)
else:
cmdob.del_reply_msg_chob[].send("Failure in deleting record. Could not find ID-string!")
of GetAll:
# send the whole table to the reply-channel (usually not recommended.)
cmdob.reply_table_chob[].send(storeta)
of GetIDs:
# send all the IDs of the objects in the table to the reply-channel as a seq of string.
cmdob.reply_allids_chob[].send(toSeq(storeta.keys))
of Stop:
# halt the thread-proc
echo "stopping statekeeper-thread..."
break
proc initStore[T](): Store[T] =
# initialize the store-object that holds a channel and a thread for each object-type
new(result)
new(result.ch)
result.ch[].open()
createThread(result.th, statekeeper[T], result.ch)
proc recvWithTimeout[T](ch: ref Channel[T], ms: int): Option[T] =
# an idea from the AI ??
let start = epochTime()
while true:
if ch[].peek() > 0:
return some(ch[].recv())
if (epochTime() - start) * 1000 > float(ms):
return none(T)
sleep(1)
# all API commands for each object-type
proc addObj[T](storob: Store[T], myob: T): string =
# send an object to statekeeper-thread (for adding to a table)
var reply_chob = new(Channel[string])
reply_chob[].open()
storob.ch[].send(Command[T](kind: Add, dataob: myob, reply_ID_chob: reply_chob))
result = reply_chob[].recv()
reply_chob[].close()
proc readObj[T](storob: Store[T], myidst: string): (string, T) =
#[
Sends a command to statekeeper-thread to return a specific object.
This proc uses (besides the global command-channel) a local reply-channel called by ref (by means of new()) matching the ref-call in statekeeper so that a unique destination can be reached seen from the statekeeper-pov. For this you need the deref-operator [] appended after the ref-object (managed pointer).
]#
var reply_chob = new(Channel[(string, T)])
reply_chob[].open()
storob.ch[].send(Command[T](kind: Read, rd_idst: myidst, rd_reply_chob: reply_chob))
result = reply_chob[].recv()
reply_chob[].close()
proc readObjTimed[T](storob: Store[T], myidst: string; timeout = -1): Option[(string, T)] =
#[
Sends a command to statekeeper-thread to return a specific object.
This implementation uses a timer / timeout to stop trying after a certain period, so that it does not hang when other processes are hanging.
Compared with readObj in this proc you must append .get to retrieve the option-value!
]#
var reply_chob = new(Channel[(string, T)])
reply_chob[].open()
storob.ch[].send(Command[T](kind: Read, rd_idst: myidst, rd_reply_chob: reply_chob))
if timeout < 0:
result = some(reply_chob[].recv())
else:
result = recvWithTimeout(reply_chob, timeout)
reply_chob[].close()
proc updateObj[T](storob: var Store[T], myidst: string, myob: T): string =
# sends an object to the statekeeper to overwrite the current object with the same id-string.
# for local reply-channel see read
var reply_chob = new(Channel[string])
reply_chob[].open()
storob.ch[].send(Command[T](kind: Update, upd_idst: myidst, upd_dataob: myob, reply_msg_chob: reply_chob))
result = reply_chob[].recv()
reply_chob[].close()
proc deleteObj[T](storob: var Store[T], myidst: string): string =
# sends an id-string to the statekeeper for to delete the matching object from the table.
var reply_chob = new(Channel[string])
reply_chob[].open()
storob.ch[].send(Command[T](kind: Delete, del_idst: myidst, del_reply_msg_chob: reply_chob))
result = reply_chob[].recv()
reply_chob[].close()
proc getAllIDs[T](storob: Store[T]): seq[string] =
# get all IDs from statekeeper and put them in a sequence
# for local reply-channel see read
var reply_chob = new(Channel[seq[string]])
reply_chob[].open()
storob.ch[].send(Command[T](kind: GetIDs, reply_allids_chob: reply_chob))
result = reply_chob[].recv()
reply_chob[].close()
proc close[T](storob: Store[T]) =
# close the store-objects and thus channel and thread
storob.ch[].send(Command[T](kind: Stop))
joinThread(storob.th)
storob.ch[].close()
# ================== MAIN ==================
proc createPage(urlst, configst, last: string; timedoutbo: bool): Page =
# create the page-object from the args for sending thru by addObj
result = Page(
url: urlst,
config: configst,
lastfetched: last,
timedout: timedoutbo
)
var
page_storob = initStore[Page]()
person_storob = initStore[Person]()
echo "Page ID: ", page_storob.addObj(Page(url: "www.ever.net", config: "basic"))
echo "Person ID: ", person_storob.addObj(Person(name: "John", level: 3))
discard page_storob.addObj(createPage("www.then.org", "json", "yesterday", true))
discard page_storob.addObj(createPage("www.now.org", "config", "today", false))
discard page_storob.addObj(createPage("www.soon.org", "xml", "tomorrow", false))
sleep(500)
let allIDsq = page_storob.getAllIDs()
echo "Reading pages.."
for keyst in allIDsq:
# someobject_storob.readObj(keyst) gives tuple: (result-message, page-object)
echo page_storob.readObj(keyst)
echo "Reading non-existant ID.."
echo page_storob.readObj("not_there")
echo "\pExample update:"
# update-example; first download page-object
let keyst = allIDsq[0] # an example ID
var upd_pageob: Page = page_storob.readObj(keyst)[1]
# update the neccessary fields
upd_pageob.url = "www.once.org"
upd_pageob.timedout = true
# resend the page-object to the statekeeper-thread
echo page_storob.updateObj(keyst, upd_pageob)
# reread for verification
echo page_storob.readObj(keyst)
# delete a page-object
echo "\pExample deletion:"
echo page_storob.deleteObj(keyst)
echo page_storob.readObj(keyst)
#echo page_storob.read(id1)
echo person_storob.readObjTimed(person_storob.getAllIDs[0], timeout = 100)
page_storob.close()
person_storob.close()I was at the same level as you about a month ago, experimenting with multithreading. By all means, keep learning about channels, but to help you, I’ll share this:
I designed a system with three threads passing messages linearly from one to the next. At first, it sounded simple and avoided a circular dependency that could lead to deadlock. In reality, though, channels alone weren’t enough, you also need additional state management.
How do the threads communicate failure? You end up needing global state for both cancellation and abort conditions. Do you need to model sentinels for normal stopping conditions? Do you need to drain the channels afterward?
I tried adding stop tokens and draining mechanisms to the threading/channels module, but in the end, switching to locks was much simpler. Also, while threading/channels doesn’t have this issue, other channel implementations distinguish channel ownership, so embedding stop tokens directly in channels is discouraged and requires an extra mechanism.
In the end, I also removed the extra thread entirely and found the design was much cleaner with locks and simpler data structures.
@arak
In your own book on page 258 you provide an example with channels. When you expose Channels as language-construct, you are supposed to support and improve them. Otherwise you must deprecate them.
I assumed the channels were a managed language-construct. AI told me they have inbuilt queuing, locking etc. But maybe AI hallucinated.
AI told me it was the modern way to do things for MT business-logic (not gaming because of performance issues).
I find locks hard to work with and very unintuitive. For me locks are the sucking ones :-)
I want to invite you to write a threaded datastore with similar API with locks instead of channels.
When you expose Channels as language-construct, you are supposed to support and improve them. Otherwise you must deprecate them.
No, I don't have to do anything like that, we offer working channels, it's just that even "working" channels are incredibly overrated to the point that people simply neglect my facts and live in denial. It's my job to expose every single bad idea computer science has produced. Besides, sometimes are channels are unavoidable, they are the only thing that exists for computer-to-computer communication.
I find locks hard to work with and very unintuitive. For me locks are the sucking ones :-)
Here you go, no thinking involved:
## Thread-safe `Table` using malebolgia's fair `TicketRWLock` (reader-writer spinlock
## from `ticketlocks`). Read operations take the read lock; mutating operations take
## the write lock.
##
## `RwTable` is a plain `object` (not `ref`). Pass **`var RwTable`** to every operation
## so the inner `Table` is not copied. For sharing between threads, store the
## `RwTable` in a location every thread can reach (e.g. module var, heap block, channel).
##
## Lookups return plain copies, not `lent`/`var` into storage. The `pairs` iterator
## holds the read lock for the **entire** `for` loop (all `yield` points). Use
## `withReadTable` / `withWriteTable` when you need a `Table` snapshot or full
## `Table` API under a lock.
import std/tables
import malebolgia/ticketlocks
export withReadLock, withWriteLock, acquireRead, releaseRead, acquireWrite, releaseWrite
export initTicketRWLock, TicketRWLock
type
RwTable*[K, V] = object
table: Table[K, V]
rw: TicketRWLock
proc initRwTable*[K, V](initialSize = defaultInitialSize): RwTable[K, V] =
## Creates an empty table protected by a new reader-writer lock.
RwTable[K, V](table: initTable[K, V](initialSize), rw: initTicketRWLock())
proc initRwTable*[K, V](t: var RwTable[K, V], initialSize = defaultInitialSize) =
## Fills `t` with an empty table and a fresh lock.
t.table = initTable[K, V](initialSize)
t.rw = initTicketRWLock()
proc toRwTable*[K, V](pairs: openArray[(K, V)]): RwTable[K, V] =
## Builds an `RwTable` from `pairs`.
result = initRwTable[K, V]()
withWriteLock(result.rw):
result.table = toTable(pairs)
template withReadTable*[K, V](m: var RwTable[K, V]; tabName: untyped; body: untyped) =
## Snapshot of the inner `Table` while holding the read lock (a copy of `Table`).
withReadLock(m.rw):
let `tabName` = m.table
body
template withWriteTable*[K, V](m: var RwTable[K, V]; tabName: untyped; body: untyped) =
## Mutable access via a temporary `Table` while holding the write lock; writes the
## table back in `finally`.
withWriteLock(m.rw):
var `tabName` = m.table
try:
body
finally:
m.table = `tabName`
# --- Read lock ---
proc len*[K, V](t: var RwTable[K, V]): int {.inline.} =
withReadLock(t.rw):
result = t.table.len
proc `[]`*[K, V](t: var RwTable[K, V], key: K): V {.inline.} =
## Value copy of `t[key]`.
withReadLock(t.rw):
result = t.table[key]
proc getOrDefault*[K, V](t: var RwTable[K, V], key: K): V {.inline.} =
withReadLock(t.rw):
result = getOrDefault(t.table, key)
proc getOrDefault*[K, V](t: var RwTable[K, V], key: K, default: V): V {.inline.} =
withReadLock(t.rw):
result = getOrDefault(t.table, key, default)
proc hasKey*[K, V](t: var RwTable[K, V], key: K): bool {.inline.} =
withReadLock(t.rw):
result = hasKey(t.table, key)
proc contains*[K, V](t: var RwTable[K, V], key: K): bool {.inline.} =
withReadLock(t.rw):
result = contains(t.table, key)
proc `$`*[K, V](t: var RwTable[K, V]): string =
withReadLock(t.rw):
result = $t.table
proc `==`*[K, V](a, b: var RwTable[K, V]): bool =
## Compares inner tables. Uses address ordering for the two locks when `a` and `b`
## are distinct objects.
if addr(a) == addr(b):
return true
if cast[uint](addr(a)) < cast[uint](addr(b)):
withReadLock(a.rw):
withReadLock(b.rw):
result = a.table == b.table
else:
withReadLock(b.rw):
withReadLock(a.rw):
result = a.table == b.table
iterator pairs*[K, V](t: var RwTable[K, V]): (K, V) =
## Yields `(key, value)` like `tables.pairs`. The read lock is held from before the
## first element until the loop ends (including `break`).
acquireRead(t.rw)
try:
for k, v in pairs(t.table):
yield (k, v)
finally:
releaseRead(t.rw)
# --- Write lock ---
proc `[]=`*[K, V](t: var RwTable[K, V], key: sink K, val: sink V) {.inline.} =
withWriteLock(t.rw):
t.table[key] = val
proc hasKeyOrPut*[K, V](t: var RwTable[K, V], key: K, val: V): bool {.inline.} =
withWriteLock(t.rw):
result = hasKeyOrPut(t.table, key, val)
proc add*[K, V](t: var RwTable[K, V], key: sink K, val: sink V) {.inline.} =
withWriteLock(t.rw):
add(t.table, key, val)
proc del*[K, V](t: var RwTable[K, V], key: K) {.inline.} =
withWriteLock(t.rw):
del(t.table, key)
proc pop*[K, V](t: var RwTable[K, V], key: K, val: var V): bool {.inline.} =
## On success, copies the removed value into `val` (your variable, not a table borrow).
withWriteLock(t.rw):
result = pop(t.table, key, val)
proc take*[K, V](t: var RwTable[K, V], key: K, val: var V): bool {.inline.} =
withWriteLock(t.rw):
result = take(t.table, key, val)
proc clear*[K, V](t: var RwTable[K, V]) {.inline.} =
withWriteLock(t.rw):
clear(t.table)