-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSynchronousQueueMS.kt
103 lines (97 loc) · 3.92 KB
/
SynchronousQueueMS.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
class SynchronousQueueMS<E> : SynchronousQueue<E> {
object RETRY
private enum class NodeTypes {
SENDER, RECEIVER
}
private abstract class Node<E>(
val type: NodeTypes,
val next: AtomicRef<Node<E>?> = atomic(null)
)
private class Sender<E> (
val elem: E,
notAtomicCoroutine: Continuation<Unit>
) : Node<E>(NodeTypes.SENDER) {
val coroutine: AtomicRef<Continuation<Unit>> = atomic(notAtomicCoroutine)
}
private class Receiver<E> (
notAtomicCoroutine: Continuation<E>?
) : Node<E>(NodeTypes.RECEIVER) {
val coroutine: AtomicRef<Continuation<E>?> = atomic(notAtomicCoroutine)
}
private val startNode: Node<E?> = Receiver(null)
private val head: AtomicRef<Node<E?>> = atomic(startNode)
private val tail: AtomicRef<Node<E?>> = atomic(startNode)
@Suppress("UNCHECKED_CAST")
override suspend fun send(element: E) {
while (true) {
val currHead = head.value
val currTail = tail.value
if (currHead == currTail || currTail.type == NodeTypes.SENDER) {
val res = suspendCoroutine<Any> sc@ { cont ->
val newTail = Sender(element, cont)
if (!currTail.next.compareAndSet(null, newTail as Node<E?>)) {
cont.resume(RETRY)
return@sc
} else {
val possibleNextNode = currTail.next.value
if (possibleNextNode != null) {
tail.compareAndSet(currTail, possibleNextNode)
}
}
}
if (res != RETRY) {
return
}
} else {
val currWorkingNode = currHead.next.value
if (currTail != tail.value || currHead != head.value || currWorkingNode == null ) {
continue
}
if (currWorkingNode.type == NodeTypes.RECEIVER && head.compareAndSet(currHead, currWorkingNode)) {
val currCoroutine = (currWorkingNode as Receiver<E>).coroutine.value
currCoroutine?.resume(element)
return
}
}
}
}
@Suppress("UNCHECKED_CAST")
override suspend fun receive(): E {
while (true) {
val currHead = head.value
val currTail = tail.value
if (currHead == currTail || currTail.type == NodeTypes.RECEIVER) {
val res = suspendCoroutine<E?> sc@ { cont ->
val newTail = Receiver<E>(cont)
if (!currTail.next.compareAndSet(null, newTail as Node<E?>)) {
cont.resume(null)
return@sc
} else {
val possibleNextNode = currTail.next.value
if (possibleNextNode != null) {
tail.compareAndSet(currTail, possibleNextNode)
}
}
}
if (res != null) {
return res
}
} else {
val currWorkingNode = currHead.next.value
if (currTail != tail.value || currHead != head.value || currWorkingNode == null ) {
continue
}
if (currWorkingNode.type == NodeTypes.SENDER && head.compareAndSet(currHead, currWorkingNode)) {
val currCoroutine = (currWorkingNode as Sender<E>).coroutine.value
currCoroutine.resume(Unit)
return (currWorkingNode as Sender<E>).elem
}
}
}
}
}