CPS without allocations:
import errorcodes
type
CoroutineBase = object of RootObj
cont: pointer
error: ErrorCode
method abort(coro: var CoroutineBase) =
discard "to override"
type
ThisCoroutine = object of CoroutineBase
x: int
proc cleanup(coro: var ThisCoroutine; a, b: int) =
echo "destroying coroutine ", coro.x
coro.error = SkipError
coro.cont = nil
method abort(coro: var ThisCoroutine) =
coro.cont = cleanup
proc countup(): ThisCoroutine =
proc l3(coro: var ThisCoroutine; a, b: int): int
proc l1(coro: var ThisCoroutine; a, b: int): int =
if coro.x > b:
coro.cont = cleanup
return coro.x
else:
coro.cont = l3
return coro.x
proc l0(coro: var ThisCoroutine; a, b: int): int =
coro.x = a
return l1(coro, a, b)
proc l3(coro: var ThisCoroutine; a, b: int): int =
inc coro.x
return l1(coro, a, b)
result = ThisCoroutine(cont: l0)
var it = countup()
while it.cont != nil:
let x = cast[proc (coro: var ThisCoroutine; a, b: int): int {.nimcall.}](it.cont)(it, 0, 10)
if it.error == Success:
echo x
Now we only need a higher level interface, maybe like so:
iterator countup(a, b: int): int {.implements: (CoroutineBase, io).}
var x = a
while x <= b:
yield x
inc x
The io here dictates that called procs marked with .io also produce yield-points... Gah this takes hours to describe in detail and how I arrived at this solution... Any questions?
besides, it is only one part that makes the whole system “work”
when one cont calls another, there must be some wrapper that can substitute the state that was most likely allocated in the first one
I have an example "manual CPS'ed" proc here
Proc to convert to continuation passing style:
https://github.com/nim-works/cps/blob/7e01e94/talk-talk/manual1_stack.nim
proc foo(n: int) =
var i = 0
while i < n:
# sleep(1)
var j = 0
while j < n:
echo i, " ", j
# sleep(1)
inc j
inc i
var j = "done"
# sleep()
echo j
foo(3)
The yield locations are the nested while loops.
And the following continuations based (manual) transformation has the same behavior. It does state substitutions and can be allocated on the stack on C and C++ backend if there is no stack escape (say iterator usage) and only trivial types (i.e. no heap allocated seq or strings)
# CPS'ed
type
# Environments for all the split procs. These are analogous to
# the stack frames in the original proc at the split locations
# We modify example 1 to store all the environment on the stack
# this is an optimization when "not supportsCopyMem(T)".
# This gives the compiler full visibility to optimize
# the code away.
Env_foo_0 = object
n_gensymmed: int
i_gensymmed: int
Env_foo_1 = object
n_gensymmed: int
i_gensymmed: int
Env_foo_2 = object
n_gensymmed: int
i_gensymmed: int
j_gensymmed: int
Env_foo_3 = object
n_gensymmed: int
i_gensymmed: int
j_gensymmed: int
Env_foo_4 = object
n_gensymmed: int
i_gensymmed: int
j_gensymmed: int
Env_foo_5 = object
n_gensymmed: int
i_gensymmed: int
j_gensymmed: int
HackEnvFoo6 = enum
# No strings for the stack optimization
Done = "done"
Env_foo_6 = object
n_gensymmed: int
i_gensymmed: int
j_gensymmed: HackEnvFoo6
Env_foo_7 = object
n_gensymmed: int
i_gensymmed: int
j_gensymmed: HackEnvFoo6
# This is an object which is large enough to hold any of the above, and is
# used for the initial allocation.
Env_foo_storage {.union.} = object
stor_Env_foo_0: Env_foo_0
stor_Env_foo_1: Env_foo_1
stor_Env_foo_2: Env_foo_2
stor_Env_foo_3: Env_foo_3
stor_Env_foo_4: Env_foo_4
stor_Env_foo_5: Env_foo_5
stor_Env_foo_6: Env_foo_6
stor_Env_foo_7: Env_foo_7
C = object
fn: proc(c: var C) {.nimcall.}
storage: Env_foo_storage
proc noop(c: var C) =
return
proc sleep(c: var C, seconds: int) =
c.fn = nil
return
import typetraits
doAssert Env_foo_storage.supportsCopyMem()
# Split proc forward declarations
# should be "proc foo_0(c: sink C): C", but crash
# to allow in-place modification and return
proc foo_0(c: var C)
proc foo_1(c: var C)
proc foo_2(c: var C)
proc foo_3(c: var C)
proc foo_4(c: var C)
proc foo_5(c: var C)
proc foo_6(c: var C)
proc foo_7(c: var C)
# Bootstrap proc to go from Nim to CPS land. Responsible for allocating the
# continuation and transferring any arguments in
proc foo(n: int): C =
when supportsCopyMem(C):
var c = C()
else:
var c = (ref C)()
echo sizeof(c.storage)
c.storage.stor_Env_foo_0.n_gensymmed = n
c.fn = foo_0
return c
# CPS functions
template injectVar(T, id: untyped) =
template id(): untyped = (c.storage.`stor _ T`.`id gensymmed`)
proc foo_0(c: var C) =
injectVar(Env_foo_0, n)
injectVar(Env_foo_0, i)
i = 0
c.fn = foo_1
proc foo_1(c: var C) =
injectVar(Env_foo_1, n)
injectVar(Env_foo_1, i)
if i < n:
c.fn = foo_2
noop(c)
return
c.fn = foo_6
proc foo_2(c: var C) =
injectVar(Env_foo_2, n)
injectVar(Env_foo_2, i)
injectVar(Env_foo_2, j)
j = 0
c.fn = foo_3
proc foo_3(c: var C) =
injectVar(Env_foo_3, n)
injectVar(Env_foo_3, i)
injectVar(Env_foo_3, j)
if j < n:
echo i, " ", j
c.fn = foo_4
noop(c)
return
c.fn = foo_5
proc foo_4(c: var C) =
injectVar(Env_foo_4, n)
injectVar(Env_foo_4, i)
injectVar(Env_foo_4, j)
inc j
c.fn = foo_3
proc foo_5(c: var C) =
injectVar(Env_foo_5, n)
injectVar(Env_foo_5, i)
injectVar(Env_foo_5, j)
inc i
c.fn = foo_1
proc foo_6(c: var C) =
injectVar(Env_foo_6, n)
injectVar(Env_foo_6, i)
injectVar(Env_foo_6, j)
j = Done
c.fn = foo_7
noop(c)
return
proc foo_7(c: var C) =
injectVar(Env_foo_7, n)
injectVar(Env_foo_7, i)
injectVar(Env_foo_7, j)
echo j
c.fn = nil
# Create initial continuation
var c = foo(3)
# Trampoline it
while c.fn != nil:
when supportsCopyMem(C):
c.fn(c)
else:
c.fn(c[])
I have written an outline on how I would implement continuations at a low-level 5 years ago here: https://github.com/mratsim/weave-io-research/blob/d12c0b5/implementation/README.md
API levels
In the library we name
- "Continuation" a continuation of a resumable function that has no return value.
- "Coroutine" a contination of a resumable function that yield values.
Coroutines are an alternate implementation of Nim already existing closure iterators in terms of high-level capabilities. As such even though their difference is minimal we consider them higher-level than raw continuations.
We envision the following API levels build on resumable functions
1. Raw continuations, interacted through:
- {.suspend.} for functions that will suspend their caller
- {.resumable.} for functions that can be suspended
- bindCallerContination to allow storing the {.resumable.} caller within a {.suspend.} function. Schedulers will use this to continue a computation at more opportune time (async IO) or on more opportune resources (multithreading).
2. Coroutines, interacted through:
- {.coro.} tags a function as a coroutine. Hopefully coro ultimately becomes a procedure declarator like proc and func
- yield to denote suspension points.
3. Schedulers, interacted through:
- async/await for IO tasks, building on bindCallerContination
- spawn/sync for CPU tasks, building on bindCallerContination
- Futures/Flowvar, which may build on coroutines.
4. Stdlib API:
- iterutils, chainable iterators based on coroutines
- for sequtils
- for strutils
- asyncstreams, suspendable streams based on coroutines
- non-blocking IO
Note: Continuations are a boon for ergonomics, flexibility and composability for example to compose an implicit suspendable state machine between streams coming from various threads, IO and iterators. Nonetheless a var openarray API is simpler and more efficient for single use of say toLowerASCII (unless we can have disappearing coroutines https://godbolt.org/g/26viuZ) and a compile-time iterator API has unique guarantees that can't be matched by a runtime mechanism.
5. High-level concurrency:
- Ergonomic lambda for anonymous functions, coroutines, channels.
- collect support for chained coroutines
- Communicating Sequential Processes (CSP) (= named channel + anonymouse coroutine)
- Actors (= name coroutine + anonymous channel)
For the background on what they enable, people are encouraged to read here: https://github.com/mratsim/weave-io-research/blob/d12c0b5/design/design_2_continuations.md
And for an in-depth overview on how other languages and framework designed and implement them you can read here: https://github.com/mratsim/weave-io-research/blob/d12c0b5/research/README.md
This includes:
I have some basic test to make sure I can create closure iterators from my suspend/resumable/bindCallerContinuation API here: https://github.com/nim-works/cps/blob/mratsim-public-api-proposal/mratsim/ex03_echoing_truths.nim
This also test if it can be composed with a scheduler so that continuations can be sent across threads for multithreraded async iterators/streams processing for example:
# Example 3: Testing the proposed continuation API.
import ../cps/core
import macros
var scheduler: seq[ContinuationOpaque]
proc echoingTruth(i: int) {.suspend.}=
echo "Truth: ", $i
scheduler.setLen(scheduler.len + 1)
scheduler[^1] = move bindCallerContinuation()
# assert bindCallerContinuation() = nil
# how to enforce moves?
proc truthOrDare(lo: int, hi: int) {.resumable.} =
var i: int = lo
while i <= hi:
suspendAfter echoingTruth(i)
inc i
var a = truthOrDare(1, 4)
a.resume()
var b = scheduler.pop()
b.resume()
var c = scheduler.pop()
c.resume()
The documentation of the macros is here: https://github.com/nim-works/cps/blob/mratsim-public-api-proposal/cps/core/public_api.nim#L339-L452
# Proc definitions
# --------------------------------------------------------------------------------------------
macro resumable*(def: untyped): untyped =
## Create a resumable procedure.
## A resumable procedure can suspend its execution
## and return control to its current caller.
## A resumable procedure starts suspended and returns a handle
## to be able to resume it.
##
## A resumable procedure may call `{.suspend.}` procedures,
## that will suspend and return control to the caller.
##
## A resumable procedure handle cannot be copied, only moved
## and MUST be mutable. It can be moved and resumed from any thread.
##
## Suspension points are called with `suspend`.
## A resumable procedure cannot have a result type, use a coroutine instead.
##
## If a resumable procedure captures resources that are non-trivial
## to release, cancellation MUST be cooperative,
## the resumable function should use a channel as "cancellation token"
## that would be checked after each suspension point.
##
## Due to non-linear, movable and interruptible control flow, there are important caveats:
## - Using {.threadvar.} will result in undefined behavior.
## Resumable functions have their own local storage.
## - `alloca` will not be preserved across suspension points.
## - `setjmp`/`longjmp` across suspension point will result in undefined behavior.
## Nim exceptions will be special-cased.
def.expectKind(nnkProcDef)
return def.resumableImpl()
macro suspend*(def: untyped): untyped =
## Tagging a proc {.suspend.}:
## - makes the proc suitable to suspend its caller.
## - allows capture of the raw caller continuation,
## for example to store it in a scheduler queue
## and resume it at a more opportune time
## - Suspending proc can only be called within a `{.resumable.}` proc
## or another `{.suspend.}` proc.
## - Suspending proc MUST be called with "suspendAfter myProc"
##
## The caller continuation can be saved with `bindCallerContinuation`
## Not running a saved continuation is equivalent to cancelling it.
##
## If a resumable procedure captures resources that are non-trivial
## to release, cancellation MUST be cooperative,
## the resumable function should use a channel as "cancellation token"
## that would be checked after each suspension point.
##
## Due to non-linear, movable and interruptible control flow, there are important caveats:
## - Using {.threadvar.} will result in undefined behavior.
## Resumable functions have their own local storage.
## - `alloca` will not be preserved across suspension points.
## - `setjmp`/`longjmp` across suspension point will result in undefined behavior.
## Nim exceptions will be special-cased.
def.expectKind nnkProcDef
return suspendProcDefImpl(def)
macro coro*(def: untyped): untyped =
## Create a coroutine
## A coroutine can suspend its execution
## and return control to its current caller.
## A coroutine starts suspended and returns a handle
## to be able to resume it.
##
## A coroutine handle cannot be copied, only moved
## and MUST be mutable.
##
## A coroutine cannot yield without a result, use a resumable procedure instead.
##
## Suspension points are called with `yield` and the value must be properly typed.
##
## Due to non-linear, movable and interruptible control flow, there are important caveats:
## - Using {.threadvar.} will result in undefined behavior.
## Resumable functions have their own local storage.
## - `alloca` will not be preserved across suspension points.
## - `setjmp`/`longjmp` across suspension point will result in undefined behavior.
## Nim exceptions will be special-cased.
def.expectKind({nnkProcDef,nnkFuncDef})
return coroProcDefImpl(def)
# Calls
# --------------------------------------------------------------------------------------------
proc pull*(coro: var Coroutine): auto {.inline.} =
## Resume a coroutine until its next `yield`
# We don't use "resume" because it collides with normal continuation
while coro.fn != nil and coro.promise.isNone():
coro.fn(coro)
if coro.promise.isSome():
return move coro.promise
# TODO: set hasFinished here or we will be one iteration late.
# - a naive solution would be to run the loop here.
# - another is to check if coro.fn is nil or without cpsCall.
else:
coro.hasFinished = true
return default(typeof(coro.promise)) # none
proc resume*(cont: var (Continuation|ContinuationOpaque)) {.inline.} =
## Resume a continuation until its next `suspend`
static: doAssert not (cont is Coroutine), "Dispatch overload bug"
while cont.fn != nil:
cont.fn(cont)
proc suspendAfter*(procCall: auto): auto =
## Call a suspending function.
## Suspending function are defined with {.suspend.}
##
## `suspendAfter` is only valid in a {.resumable.} or {.suspend.} context.
##
## TODO, should be cps rewritten to get the continuation
{.error: "suspendAfter is only valid in a {.resumable.} or {.suspend.} context.".}
Sweet! Looks useable from a quick glance. One note is that having access to the raw underlying mechanisms / data types is very useful. Nimony should make that easier, but getting access to the environment objects and their types make it extensible. Same with closures as well actually.
Also defining the semantics of defer and finally is hard but important as any async system will need to handle that.
And with this refined design a coroutine can call other coroutines while still leaving it to the scheduler to drive the progress:
type
ContinuationProc = proc (coro: ptr CoroutineBase): Continuation {.nimcall.}
Continuation = object
fn: ContinuationProc
env: ptr CoroutineBase
CoroutineBase = object of RootObj
caller: Continuation
method abort(coro: ptr CoroutineBase) =
discard "to override"
type
Frame = object
mem: ptr CoroutineBase
currentSize: int
proc ensureFrameSize(this: var Frame; frameSize: int) =
if frameSize > this.currentSize:
this.mem = cast[ptr CoroutineBase](realloc0(this.mem, this.currentSize, frameSize))
this.currentSize = frameSize
type
ThisCoroutine = object of CoroutineBase
x, a, b: int
frame: Frame # a coroutine that calls other coroutines needs a frame to store their state
EchoCoroutine = object of CoroutineBase
message: string
template asCont(p: typed): ContinuationProc = cast[ContinuationProc](p)
proc cleanEcho(coro: ptr EchoCoroutine): Continuation =
echo "cleaning echo"
result = coro.caller
`=destroy`(coro.message)
proc asyncEchoRun(coro: ptr EchoCoroutine): Continuation =
echo coro.message
result = Continuation(fn: asCont cleanEcho, env: coro)
proc asyncEchoCreate(res: ptr EchoCoroutine; message: sink string;
caller: Continuation): Continuation =
res[] = EchoCoroutine(message: message, caller: caller)
result = Continuation(fn: asCont asyncEchoRun, env: res)
proc cleanup(coro: ptr ThisCoroutine): Continuation =
echo "destroying coroutine ", coro.x
result = Continuation(fn: nil, env: nil)
method abort(coro: ptr ThisCoroutine) =
discard cleanup(coro)
proc l3(coro: ptr ThisCoroutine): Continuation
proc l2(coro: ptr ThisCoroutine): Continuation =
if coro.x > coro.b:
result = Continuation(fn: asCont cleanup, env: coro)
else:
result = Continuation(fn: asCont l3, env: coro)
echo coro.x
proc l1(coro: ptr ThisCoroutine): Continuation =
# simulate an async echo:
ensureFrameSize coro.frame, sizeof(EchoCoroutine)
# Pass the outer coroutine pointer instead of the continuation
result = asyncEchoCreate(cast[ptr EchoCoroutine](coro.frame.mem), "hello",
Continuation(fn: asCont l2, env: coro))
proc l0(coro: ptr ThisCoroutine): Continuation =
coro.x = coro.a
result = l1(coro)
proc l3(coro: ptr ThisCoroutine): Continuation =
inc coro.x
result = l2(coro)
proc countup(res: ptr ThisCoroutine; a, b: int; caller: Continuation): Continuation =
res[] = ThisCoroutine(caller: caller, a: a, b: b)
result = Continuation(fn: asCont l0, env: res)
var it: ThisCoroutine
var pc = countup(addr it, 0, 10, Continuation(fn: nil, env: nil))
while pc.fn != nil:
pc = pc.fn(pc.env)
And here is how spawn and a scheduler would look like:
type
Scheduler = object
coroutines: seq[Continuation]
proc initScheduler(): Scheduler =
result.coroutines = @[]
proc spawn(sched: var Scheduler; cont: Continuation) =
## Spawn a new coroutine to run concurrently
sched.coroutines.add(cont)
proc run(sched: var Scheduler) =
## Run all coroutines until completion using round-robin scheduling
while sched.coroutines.len > 0:
# Round-robin through all coroutines
for i in 0..<sched.coroutines.len:
let pc = sched.coroutines[i]
if pc.fn != nil:
# Run one step of this coroutine
sched.coroutines[i] = pc.fn(pc.env)
else:
# Coroutine completed, remove it
sched.coroutines.delete(i)
break
# Example usage with spawn
var scheduler = initScheduler()
# Spawn multiple coroutines
var it: ThisCoroutine
scheduler.spawn(countup(addr it, 0, 5, Continuation(fn: nil, env: nil)))
var it2: ThisCoroutine
scheduler.spawn(countup(addr it2, 10, 15, Continuation(fn: nil, env: nil)))
# Run all coroutines concurrently
scheduler.run()
Now this only needs a .cps macro and the scheduler needs to grow an event loop...
There's a PoC for inline iterator composability. Mostly works:
https://github.com/beef331/slicerator/blob/master/src/slicerator/itermacros.nim
in .cps macro should be a way to patch continuation flow manually or something
i mean a function that can receive next Continuation(fn, env) before returning anything back to scheduler and store it for future while returning fn: nil for now
and then some async action happens return saved continuation back to scheduler