Skip to content

Commit

Permalink
objparamserver: fix leak
Browse files Browse the repository at this point in the history
  • Loading branch information
Pylgos committed Oct 21, 2023
1 parent d1e462d commit 6f8f514
Showing 1 changed file with 29 additions and 18 deletions.
47 changes: 29 additions & 18 deletions src/rclnim/objparamservers.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import rclnim
import rclnim/chronossupport
import chronos
import std/[macros, genasts]
import std/[macros, genasts, tables]

proc to(p: ParamValue, To: typedesc[bool]): To = p.boolVal
proc to(p: ParamValue, To: typedesc[int64]): To = p.intVal
Expand All @@ -13,11 +13,22 @@ proc to(p: ParamValue, To: typedesc[seq[float64]]): To = p.doubleArrayVal
proc to(p: ParamValue, To: typedesc[seq[string]]): To = p.strArrayVal

type
ObjParamServer*[T: object] = ref object
internalValue*: T
SharedData[T] = ref object
server: ParamServer
value: T
eventQueue: AsyncEventQueue[ParamEvent]

ObjParamServerObj[T: object] = object
data: SharedData[T]
loop: Future[void]

ObjParamServer*[T: object] = ref ObjParamServerObj[T]

proc `=destroy`[T](self: var ObjParamServerObj[T]) =
self.loop.cancelSoon()
`=destroy`(self.data)
`=destroy`(self.loop)

proc genFieldDispatchImpl(obj: NimNode, n: NimNode, body: NimNode, base: string, caseStmt: NimNode, field: NimNode) =
case n.kind
of nnkObjectTy:
Expand Down Expand Up @@ -49,32 +60,32 @@ macro genFieldDispatch[T](obj: T, name: string, field: untyped, body: untyped):
result = nnkCaseStmt.newTree(name)
genFieldDispatchImpl(obj, typ, body, "", result, field)

proc updateParamLoop[T](self: ObjParamServer[T]) {.async.} =
proc updateParamLoop[T](data: SharedData[T]) {.async.} =
try:
while true:
let events = await self.server.waitForUpdate()
let events = await data.server.waitForUpdate()
if events.len == 0:
return
for ev in events:
genFieldDispatch(self.internalValue, ev.name, field):
genFieldDispatch(data.value, ev.name, field):
field = ev.param.to(typeof(field))
self.eventQueue.emit(ev)
data.eventQueue.emit(ev)
except ShutdownError:
discard

proc value*[T](self: ObjParamServer[T]): lent T =
self.internalValue
self.data.value

proc server*[T](self: ObjParamServer[T]): ParamServer =
self.server
self.data.server

proc eventQueue*[T](self: ObjParamServer[T]): AsyncEventQueue[ParamEvent] =
self.eventQueue

macro set*[T](self: ObjParamServer[T], fld: untyped, value: untyped): untyped =
result = newStmtList()
let fieldName = repr(fld)
let access = parseExpr(repr(self) & ".internalValue." & fieldName)
let access = parseExpr(repr(self) & ".data.value." & fieldName)
let asgn = nnkAsgn.newTree(access, value)
result.add asgn
result.add genAst(self, fieldName, access) do:
Expand All @@ -92,27 +103,27 @@ proc declareParam[T](server: ParamServer, name: string, val: T) =
else:
static: raiseAssert "type " & $T & " is not supported"

proc syncParams[T](self: ObjParamServer[T]) =
for (paramName, paramInfo) in self.server.getAll().pairs:
genFieldDispatch(self.data.value, paramName, field):
field = paramInfo.value.to(typeof(field))

proc createObjParamServer*[T](node: Node, default: T): ObjParamServer[T] =
let server = node.createParamServer()

declareParam(server, "", default)
server.endDeclaration()

result = ObjParamServer[T](
internalValue: default,
server: server,
eventQueue: newAsyncEventQueue[ParamEvent]()
)

asyncSpawn result.updateParamLoop()
result = ObjParamServer[T](data: SharedData[T](server: server, eventQueue: newAsyncEventQueue[ParamEvent](), value: default))
result.syncParams()
result.loop = updateParamLoop(result.data)


when isMainModule:
type
TestB = object
fieldD: string


Test = object
fieldA: int
fieldB: float
Expand Down

0 comments on commit 6f8f514

Please sign in to comment.