Skip to content

Commit

Permalink
ARC works for async on Windows (#13179)
Browse files Browse the repository at this point in the history
  • Loading branch information
Araq authored Jan 17, 2020
1 parent 796aafe commit 7626907
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 40 deletions.
4 changes: 2 additions & 2 deletions compiler/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ proc canFormAcycleAux(marker: var IntSet, typ: PType, startId: int): bool =
else: discard

proc isFinal*(t: PType): bool =
var t = t.skipTypes(abstractInst)
result = t.kind != tyObject or tfFinal in t.flags
let t = t.skipTypes(abstractInst)
result = t.kind != tyObject or tfFinal in t.flags or isPureObject(t)

proc canFormAcycle*(typ: PType): bool =
var marker = initIntSet()
Expand Down
56 changes: 29 additions & 27 deletions lib/pure/asyncdispatch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -247,18 +247,18 @@ when defined(windows) or defined(nimdoc):
ioPort: Handle
handles: HashSet[AsyncFD]

CustomOverlapped = object of OVERLAPPED
CustomObj = object of OVERLAPPED
data*: CompletionData

PCustomOverlapped* = ref CustomOverlapped
CustomRef* = ref CustomObj

AsyncFD* = distinct int

PostCallbackData = object
ioPort: Handle
handleFd: AsyncFD
waitFd: Handle
ovl: owned PCustomOverlapped
ovl: owned CustomRef
PostCallbackDataPtr = ptr PostCallbackData

AsyncEventImpl = object
Expand Down Expand Up @@ -336,13 +336,15 @@ when defined(windows) or defined(nimdoc):

var lpNumberOfBytesTransferred: DWORD
var lpCompletionKey: ULONG_PTR
var customOverlapped: PCustomOverlapped
var customOverlapped: CustomRef
let res = getQueuedCompletionStatus(p.ioPort,
addr lpNumberOfBytesTransferred, addr lpCompletionKey,
cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool
result = true
when defined(gcDestructors):
GC_ref(customOverlapped)
# For 'gcDestructors' the destructor of 'customOverlapped' will
# be called at the end and we are the only owner here. This means
# We do not have to 'GC_unref(customOverlapped)' because the destructor
# does that for us.

# http://stackoverflow.com/a/12277264/492186
# TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
Expand All @@ -359,7 +361,8 @@ when defined(windows) or defined(nimdoc):
if customOverlapped.data.cell.data != nil:
system.dispose(customOverlapped.data.cell)

GC_unref(customOverlapped)
when not defined(gcDestructors):
GC_unref(customOverlapped)
else:
let errCode = osLastError()
if customOverlapped != nil:
Expand All @@ -368,7 +371,8 @@ when defined(windows) or defined(nimdoc):
lpNumberOfBytesTransferred, errCode)
if customOverlapped.data.cell.data != nil:
system.dispose(customOverlapped.data.cell)
GC_unref(customOverlapped)
when not defined(gcDestructors):
GC_unref(customOverlapped)
else:
if errCode.int32 == WAIT_TIMEOUT:
# Timed out
Expand Down Expand Up @@ -409,6 +413,13 @@ when defined(windows) or defined(nimdoc):
getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun)
close(dummySock)

proc newCustom*(): CustomRef =
result = CustomRef() # 0
GC_ref(result) # 1 prevent destructor from doing a premature free.
# destructor of newCustom's caller --> 0. This means
# Windows holds a ref for us with RC == 0 (single owner).
# This is passed back to us in the IO completion port.

proc recv*(socket: AsyncFD, size: int,
flags = {SocketFlag.SafeDisconn}): owned(Future[string]) =
## Reads **up to** ``size`` bytes from ``socket``. Returned future will
Expand All @@ -435,8 +446,7 @@ when defined(windows) or defined(nimdoc):

var bytesReceived: DWORD
var flagsio = flags.toOSFlags().DWORD
var ol = PCustomOverlapped()
GC_ref(ol)
var ol = newCustom()
ol.data = CompletionData(fd: socket, cb:
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
if not retFuture.finished:
Expand Down Expand Up @@ -512,8 +522,7 @@ when defined(windows) or defined(nimdoc):

var bytesReceived: DWORD
var flagsio = flags.toOSFlags().DWORD
var ol = PCustomOverlapped()
GC_ref(ol)
var ol = newCustom()
ol.data = CompletionData(fd: socket, cb:
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
if not retFuture.finished:
Expand Down Expand Up @@ -565,8 +574,7 @@ when defined(windows) or defined(nimdoc):
dataBuf.len = size.ULONG

var bytesReceived, lowFlags: DWORD
var ol = PCustomOverlapped()
GC_ref(ol)
var ol = newCustom()
ol.data = CompletionData(fd: socket, cb:
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
if not retFuture.finished:
Expand Down Expand Up @@ -616,8 +624,7 @@ when defined(windows) or defined(nimdoc):
zeroMem(addr(staddr[0]), 128)
copyMem(addr(staddr[0]), saddr, saddrLen)

var ol = PCustomOverlapped()
GC_ref(ol)
var ol = newCustom()
ol.data = CompletionData(fd: socket, cb:
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
if not retFuture.finished:
Expand Down Expand Up @@ -658,8 +665,7 @@ when defined(windows) or defined(nimdoc):
var bytesReceived = 0.DWORD
var lowFlags = 0.DWORD

var ol = PCustomOverlapped()
GC_ref(ol)
var ol = newCustom()
ol.data = CompletionData(fd: socket, cb:
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
if not retFuture.finished:
Expand Down Expand Up @@ -754,8 +760,7 @@ when defined(windows) or defined(nimdoc):
clientSock.close()
retFuture.fail(getCurrentException())

var ol = PCustomOverlapped()
GC_ref(ol)
var ol = newCustom()
ol.data = CompletionData(fd: socket, cb:
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
if not retFuture.finished:
Expand Down Expand Up @@ -799,7 +804,7 @@ when defined(windows) or defined(nimdoc):

{.push stackTrace: off.}
proc waitableCallback(param: pointer,
timerOrWaitFired: WINBOOL): void {.stdcall.} =
timerOrWaitFired: WINBOOL) {.stdcall.} =
var p = cast[PostCallbackDataPtr](param)
discard postQueuedCompletionStatus(p.ioPort, timerOrWaitFired.DWORD,
ULONG_PTR(p.handleFd),
Expand All @@ -815,8 +820,7 @@ when defined(windows) or defined(nimdoc):
var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
pcd.ioPort = p.ioPort
pcd.handleFd = fd
var ol = PCustomOverlapped()
GC_ref(ol)
var ol = newCustom()

ol.data = CompletionData(fd: fd, cb:
proc(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} =
Expand Down Expand Up @@ -931,8 +935,7 @@ when defined(windows) or defined(nimdoc):
let handleFD = AsyncFD(hEvent)
pcd.ioPort = p.ioPort
pcd.handleFd = handleFD
var ol = PCustomOverlapped()
GC_ref(ol)
var ol = newCustom()
ol.data.fd = handleFD
ol.data.cb = handleCallback
# We need to protect our callback environment value, so GC will not free it
Expand Down Expand Up @@ -1621,8 +1624,7 @@ when defined(windows) or defined(nimdoc):
let retFuture = newFuture[void]("doConnect")
result = retFuture

var ol = PCustomOverlapped()
GC_ref(ol)
var ol = newCustom()
ol.data = CompletionData(fd: socket, cb:
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
if not retFuture.finished:
Expand Down
12 changes: 4 additions & 8 deletions lib/pure/asyncfile.nim
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ proc readBuffer*(f: AsyncFile, buf: pointer, size: int): Future[int] =
var retFuture = newFuture[int]("asyncfile.readBuffer")

when defined(windows) or defined(nimdoc):
var ol = PCustomOverlapped()
GC_ref(ol)
var ol = newCustom()
ol.data = CompletionData(fd: f.fd, cb:
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
if not retFuture.finished:
Expand Down Expand Up @@ -212,8 +211,7 @@ proc read*(f: AsyncFile, size: int): Future[string] =
when defined(windows) or defined(nimdoc):
var buffer = alloc0(size)

var ol = PCustomOverlapped()
GC_ref(ol)
var ol = newCustom()
ol.data = CompletionData(fd: f.fd, cb:
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
if not retFuture.finished:
Expand Down Expand Up @@ -340,8 +338,7 @@ proc writeBuffer*(f: AsyncFile, buf: pointer, size: int): Future[void] =
## specified file.
var retFuture = newFuture[void]("asyncfile.writeBuffer")
when defined(windows) or defined(nimdoc):
var ol = PCustomOverlapped()
GC_ref(ol)
var ol = newCustom()
ol.data = CompletionData(fd: f.fd, cb:
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
if not retFuture.finished:
Expand Down Expand Up @@ -414,8 +411,7 @@ proc write*(f: AsyncFile, data: string): Future[void] =
var buffer = alloc0(data.len)
copyMem(buffer, addr copy[0], data.len)

var ol = PCustomOverlapped()
GC_ref(ol)
var ol = newCustom()
ol.data = CompletionData(fd: f.fd, cb:
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
if not retFuture.finished:
Expand Down
9 changes: 6 additions & 3 deletions lib/system/refs_v2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ proc nimDecWeakRef(p: pointer) {.compilerRtl, inl.} =

proc nimIncRef(p: pointer) {.compilerRtl, inl.} =
inc head(p).rc, rcIncrement
#cprintf("[INCREF] %p\n", p)
when traceCollector:
cprintf("[INCREF] %p\n", head(p))

proc nimRawDispose(p: pointer) {.compilerRtl.} =
when not defined(nimscript):
Expand Down Expand Up @@ -131,11 +132,13 @@ proc nimDecRefIsLast(p: pointer): bool {.compilerRtl, inl.} =
var cell = head(p)
if (cell.rc and not rcMask) == 0:
result = true
#cprintf("[DESTROY] %p\n", p)
when traceCollector:
cprintf("[ABOUT TO DESTROY] %p\n", cell)
else:
dec cell.rc, rcIncrement
# According to Lins it's correct to do nothing else here.
#cprintf("[DeCREF] %p\n", p)
when traceCollector:
cprintf("[DeCREF] %p\n", cell)

proc GC_unref*[T](x: ref T) =
## New runtime only supports this operation for 'ref T'.
Expand Down
69 changes: 69 additions & 0 deletions tests/arc/tasyncawait.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
discard """
output: "5000"
cmd: "nim c --gc:arc $file"
"""

import asyncdispatch, asyncnet, nativesockets, net, strutils, os

var msgCount = 0

const
swarmSize = 50
messagesToSend = 100

var clientCount = 0

proc sendMessages(client: AsyncFD) {.async.} =
for i in 0 ..< messagesToSend:
await send(client, "Message " & $i & "\c\L")

proc launchSwarm(port: Port) {.async.} =
for i in 0 ..< swarmSize:
var sock = createAsyncNativeSocket()

await connect(sock, "localhost", port)
await sendMessages(sock)
closeSocket(sock)

proc readMessages(client: AsyncFD) {.async.} =
# wrapping the AsyncFd into a AsyncSocket object
var sockObj = newAsyncSocket(client)
var (ipaddr, port) = sockObj.getPeerAddr()
doAssert ipaddr == "127.0.0.1"
(ipaddr, port) = sockObj.getLocalAddr()
doAssert ipaddr == "127.0.0.1"
while true:
var line = await recvLine(sockObj)
if line == "":
closeSocket(client)
clientCount.inc
break
else:
if line.startswith("Message "):
msgCount.inc
else:
doAssert false

proc createServer(port: Port) {.async.} =
var server = createAsyncNativeSocket()
block:
var name: Sockaddr_in
name.sin_family = typeof(name.sin_family)(toInt(AF_INET))
name.sin_port = htons(uint16(port))
name.sin_addr.s_addr = htonl(INADDR_ANY)
if bindAddr(server.SocketHandle, cast[ptr SockAddr](addr(name)),
sizeof(name).Socklen) < 0'i32:
raiseOSError(osLastError())

discard server.SocketHandle.listen()
while true:
asyncCheck readMessages(await accept(server))

asyncCheck createServer(Port(10335))
asyncCheck launchSwarm(Port(10335))
while true:
poll()
if clientCount == swarmSize: break

assert msgCount == swarmSize * messagesToSend
echo msgCount

0 comments on commit 7626907

Please sign in to comment.