Note that the shared data is an object graph and may contain ref object types.
The main thing I tried was creating a ring buffer on the coordinator. Items stay in the buffer until all workers are done then the coordinator removes the item from the buffer. However, it seems that if any types in the graph rooted at item are of ref object then the worker threads that are reading will concurrently bump ref counts and thus break the ref counts.
I'd like to keep using ORC if possible. It may be quite involved to ensure there are no cycles in the library.
The data is shaped something like:
type
ValueKind* = enum
vkNull
vkBool
vkInteger
vkDouble
vkString
vkObject
vkArray
Value* {.acyclic.} = ref object
case kind*: ValueKind
of vkNull: discard
of vkBool: boolVal*: bool
of vkInteger: integerVal*: int64
of vkDouble: doubleVal*: float64
of vkString: stringVal*: string
of vkObject: objectVal*: TableRef[string, Value]
of vkArray: arrayVal*: seq[Value]
I'd like to pass it to many threads without copying if possible. I can't figure out if passing SharedPtr[Value] over a channel would work or not. It seems like a consumer that accesses one of the internal objectVal or arrayVal fields would end up bumping a ref count unsafely.
I guess worst case I can serialize everything into a byte array at the producer and then deserialize it all at the consumer 😬
Ditch the refs entirely.
import std/[tables, strformat]
type
ValueId* = distinct int
ValueKind* = enum
vkNull
vkBool
vkInteger
vkDouble
vkString
vkObject
vkArray
Value* {.acyclic.} = ref object
case kind*: ValueKind
of vkNull: discard
of vkBool: boolVal*: bool
of vkInteger: integerVal*: int64
of vkDouble: doubleVal*: float64
of vkString: stringVal*: string
of vkObject: objectVal*: Table[string, ValueId]
of vkArray: arrayVal*: seq[ValueId]
Graph* = object
values: seq[Value]
root*: ValueId
proc add*(gr: var Graph, val: sink Value): ValueId =
let valId = ValueId(gr.values.len)
gr.values.add(move val)
valId
proc get*(gr: var Graph, valId: ValueId): lent Value =
gr.values[int(valId)]
proc repr*(gr: Graph, valId: ValueId): string =
let p = addr gr.values[int(valId)]
case p.kind
of vkNull: "null"
of vkBool: (if p.boolVal: "true" else: "false")
of vkInteger: "{p.integerVal}".fmt
of vkDouble: "{p.doubleVal}".fmt
of vkString: p.stringVal
of vkObject:
var repr = ""
for key, val in p.objectVal:
if repr.len > 0:
repr = repr & ". "
repr = repr & key & " -> " & repr(gr, val)
"{" & repr & "}"
of vkArray:
var repr = ""
for val in p.arrayVal:
if repr.len > 0:
repr = repr & ". "
repr = repr & repr(gr, val)
"[" & repr & "]"
proc repr*(gr: Graph): string =
assert int(gr.root) > 0
repr(gr, gr.root)
var gr = Graph()
let id1 = gr.add Value(kind: vkInteger, integerVal: 13)
let id2 = gr.add Value(kind: vkNull)
let id3 = gr.add Value(kind: vkArray, arrayVal: @[id1, id2])
let id4 = gr.add Value(kind: vkDouble, doubleVal: 123.4)
let id5 = gr.add Value(kind: vkObject, objectVal: {"aaa": id4, "bbb": id3}.toTable)
gr.root = id5
echo repr(gr)
You can go even further and drop the Table and seq in the nodes, instead use something like first, last: ValueId. And instead of strings you can also use something like IdentifierId.
Gave this a shot. Converted my structure to an object instead of ref object but still had it contain Seq, Table and String. Seq, Table, String cause segfaults as well when crossing thread boundaries.
/Users/mlaw/.choosenim/toolchains/nim-2.2.6/lib/pure/collections/tables.nim(900) test_replica_integration
/Users/mlaw/.choosenim/toolchains/nim-2.2.6/lib/system/orc.nim(553) nimDecRefIsLastCyclicStatic
/Users/mlaw/.choosenim/toolchains/nim-2.2.6/lib/system/orc.nim(509) rememberCycle
/Users/mlaw/.choosenim/toolchains/nim-2.2.6/lib/system/orc.nim(157) unregisterCycle
SIGSEGV: Illegal storage access. (Attempt to read from nil?) /Users/mlaw/.choosenim/toolchains/nim-2.2.6/lib/system/orc.nim(157) unregisterCycle
That's due to ORC cycle collector running. Looks like it's running on a ref destroy operation which calls ORC cycle collection. I'm unsure where that'd come from since you said you switch to value types and not refs. It may be hidden in the table, or you missed a ref somewhere. Note you can mark ref's as non-cyclic using .acyclic..
If your graph is acyclic like a DAG, then mm:atomicArc would be a good option. You can use your original buffer method and basically you're done. Though in that case a value object + SharedPtr[T] would work as well.
Assuming that it is cyclic you still have a few options I can see:
Note that both 1 & 2 would require manually breaking the cycle. Not too hard if you can destroy the whole graph at once.
Next you can do:
These options assume that the whole graph can be destroyed at once as well, e.g. no shared ref's or pointers between graphs for different jobs or something. In these cases you're making a kind of region allocator with varying degrees of manual allocation.
My choice would probably be 4 or 3. You don't need to worry about pointer safety or whatnot if the coordinator is the one that handles destroying the values after threads are done. Also you wouldn't share the seq or buffer itself, but a handle or pointer to it, or a global var.
PS that if you want to use a channel to share updates, be sure to use the threading lib I linked above. It supports ARC/ORC and copying SharedPtr[T] or refs for you.
After some more debugging, the current problem (the segfault in tables.nim) is not related to how I'm passing data.
I create a worker thread, pass it my data, let it process. The thread returns some numbers to me when it completes and everything looks great.
I can repeat this many times with many different threads in a pool and all my tests pass.
The problem does not occur until I join the worker thread(s) and shut down the program. At that point, I get the stack I shared earlier:
/Users/mlaw/.choosenim/toolchains/nim-2.2.6/lib/pure/collections/tables.nim(900) test_replica_integration
/Users/mlaw/.choosenim/toolchains/nim-2.2.6/lib/system/orc.nim(553) nimDecRefIsLastCyclicStatic
/Users/mlaw/.choosenim/toolchains/nim-2.2.6/lib/system/orc.nim(509) rememberCycle
/Users/mlaw/.choosenim/toolchains/nim-2.2.6/lib/system/orc.nim(157) unregisterCycle
SIGSEGV: Illegal storage access. (Attempt to read from nil?)
Weirdly putting a GC_fullCollect() call in the worker as the last thing it does makes this problem go away... The reason I thought about GC_fullCollect was that I vaguely remember reading that cycle detection is not immediate and may happen after some delay. So my theory was that Nim was trying to collect cycles created by the joined threads. 🤷♂️
But they don't?
import std/[os, locks, tables, strformat]
type Payload = object
val: seq[Table[string, seq[int]]]
proc `=copy`(a: var Payload, b: Payload) {.error.}
const
N = 32
M = 1000
var
lock: Lock
buf: array[N, Payload]
head, tail: int
initLock(lock)
proc tryPush(p: var Payload): bool {.gcsafe.} =
acquire(lock)
defer: release(lock)
if tail - head >= N:
return false
{.cast(gcsafe).}:
buf[tail mod N] = move p
tail += 1
true
proc tryPop(p: var Payload): bool {.gcsafe.} =
acquire(lock)
defer: release(lock)
if tail - head <= 0:
return false
{.cast(gcsafe).}:
p = move buf[head mod N]
head += 1
true
proc producer() {.thread.} =
for i in 0..<M:
var p = Payload(val: @[{"{i}".fmt: @[i]}.toTable])
while not tryPush(p):
sleep(1)
proc consumer() {.thread.} =
for i in 0..<M:
var p: Payload
while not tryPop(p):
sleep(1)
doAssert p == Payload(val: @[{"{i}".fmt: @[i]}.toTable])
proc main() =
var
thProducer: Thread[void]
thConsumer: Thread[void]
createThread(thProducer, producer)
createThread(thConsumer, consumer)
joinThread(thProducer)
joinThread(thConsumer)
main()
Also, this nim-2.2.6/lib/pure/collections/tables.nim(900) from your stack trace points to
proc newTable*[A, B](initialSize = defaultInitialSize): TableRef[A, B] in the source code. Are you sure you're not using any refs? yeah so turns out I had a global table ref set up in the library code to reduce allocations in the common case where a "node" has no "relationships" as this engine processes ~40-100k rows per second per core.
# Reusable empty relationships table to avoid per-node allocation
# Since nodes from Sources don't have relationships until Join adds them,
# we can reuse a single empty table instance across all nodes
let emptyNodeRelationships* = newTable[string, proc(): Stream[Node] {.closure.}]()
fml. Swapping that to a thread local fixed everything.