Hi All,
Thankyou for nim. I have read the manual carefully for information about type constraints and could not answer this. I have written a DBWorker thread that writes objects to a database in batches. Im using norm. Those objects are transmitted to the thread by a channel which is in global memory. Id like to 'modularize' the dbworker thread. Therefore, id like to send any object descended from 'Model' to the channel and have it be written.
Ive put the whole module below, less some of the imported modules which i also wrote which are not relevant. But to narrow in on a segment of the code;
proc asyncSend*[T:Model](item: T): Future[void] {.async.} =
while not pendingWriteListings.trySend(item):
sleepAsync(5000)
Yields;
/home/ben/Workspace/tm/src/persistence/dbwriter.nim(65, 20) Error: type mismatch: got 'Model' for 'tmpTupleAsgn_3271560759[1]' but expected 'ListingObj = object'
make[1]: *** [Makefile:52: bin/embedder] Error 1
make[1]: Leaving directory '/home/ben/Workspace/tm'
ListingObj has type definition (plus omitted fields);
ListingObj* = object of Model # (N)ORM for saving in sqlite
Makes complete sense. Model is not the same as ListingObj. My question is, 0) Is it possible to do what i'm asking? 1) how do i specify 'object that is a child of Model' as a constraint on my generic parameter T? 2) When i pendingWriteListings.recv() later, i will only have a 'Model' object right? And somehow id need to cast it to the 'subclass' (forgive my terminology here, im not sure what is appropriate). So i need to do something there also. Im still coming to understand the manual segments on dynamic dispatch and methods. I know there is some conceptual crossover here, and with the term 'reflection' also.
I also tried something along these lines;
proc asyncSend*[T:object of Model](item: T): Future[void] {.async.} =
while not pendingWriteListings.trySend(item):
sleepAsync(5000)
Which did not work...
## Separate thread to write to db, from a buffered channel. Purpose to prevent
## main thread which sends work from waiting for db operations
# Each thread has its own heap. Gloabals are shared. Passing a ref to a thread
# will be passing it a memory address of a data structure in a heap of a foreigh
# thread. By design, this is unsupported. Therefore, you cannot pass ref to a
# thread.
#
# This heap per thread assertion holds for memory management strategies 'refc'.
# For 'orc', 'boehm' it is not true and there is one heap shared over threads.
# We stick with heap per thread assumption for simplicity and safety.
#
# Threads are for *concurrent* execution. Concurrent, on the same core, the
# kernel will maximize utilization of a threaded core by switching between
# threads to cover unavoidable idle time of a given thread with activity in
# another.
#
# Thread objects take one argument, T, which represents data passed into a
# thread. Thread arguments are always singular. If you want multiple, use a T
# that contains multiple data fields.
#
# A thread is started with createThread (non blocking), completed with joinThread
# (blocking).
# - proc createThread[TArg](t: var Thread[TArg];
# tp: proc (arg: TArg) {.thread, nimcall.}; param: TArg)
# - proc joinThread[TArg](t: Thread[TArg]) {.inline.}
#
# Note that the procedure tp has a {.thread.} pragma
import norm/[model,sqlite]
import persistence/[model,orm]
import std/[paths,asyncdispatch,logging,times,monotimes]
const WORKER_DB_BATCHSIZE = 8192
# must be in the global memory, in order to be read from multiple threads
type ModelSubclass = object of Model
var pendingWriteListings*: Channel[Model]
proc initChannel*() =
pendingWriteListings.open(2*WORKER_DB_BATCHSIZE)
proc closeChannel*() =
pendingWriteListings.close()
type WorkDbArg*[T: Model] = object
dbpath: Path
globalChannel: Channel[T]
proc asyncSend*[T:Model](item: T): Future[void] {.async.} =
while not pendingWriteListings.trySend(item):
sleepAsync(5000)
proc workDb*(dbPath: Path) {.thread.} =
var px: int = 0
info "Starting db worker"
var
db: DbConn = open(dbPath.string, "", "", "")
queued: bool
item: ListingObj # ref Model object
items: seq[Listing]
now: MonoTime
cycleDelta: Duration
db.createTables(Listing()) # prep tables
while peek(pendingWriteListings) > -1: # channel not closed AND empty
now = getMonoTime()
(queued, item) = pendingWriteListings.tryRecv
while queued and items.len<WORKER_DB_BATCHSIZE:
items.add item.toRef
(queued,item) = pendingWriteListings.tryRecv
if queued:
items.add item.toRef # pickup final item
if items.len>0:
echo &"Database worker inserting {items.len} listings"
try:
db.transaction: db.insert(items, conflictPolicy=cpReplace)
except Exception as e: echo (repr(e))
# avoid hammering with peek for channel of low load/empty
if items.len<WORKER_DB_BATCHSIZE:
cycleDelta=getMonoTime() - now
if cycleDelta < WORKER_DB_BUFFER_FILL_PAUSE:
px++
sleep((WORKER_DB_BUFFER_FILL_PAUSE-cycleDelta).inMilliseconds)
if items.len>0: items = @[]
info &"Closing db worker, {px} waits performed"
db.close()
So i think this is the answer. There is no type to match any subtype, the compiler must be able to infer the runtime type (therefore which must be concrete) in the semantic pass. Id need to use a solution that works with this paradigm. For example an object with a 'kind' field or some other device. I need to work with the language here and not try to square peg round hole things.