forked from sarchlab/akita
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdirectconnection.go
145 lines (117 loc) · 3.07 KB
/
directconnection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package akita
type directConnectionEnd struct {
port Port
buf []Msg
bufSize int
busy bool
}
// DirectConnection connects two components without latency
type DirectConnection struct {
*TickingComponent
nextPortID int
ports []Port
ends map[Port]*directConnectionEnd
}
// PlugIn marks the port connects to this DirectConnection.
func (c *DirectConnection) PlugIn(port Port, sourceSideBufSize int) {
c.Lock()
defer c.Unlock()
c.ports = append(c.ports, port)
end := &directConnectionEnd{}
end.port = port
end.bufSize = sourceSideBufSize
c.ends[port] = end
port.SetConnection(c)
}
// Unplug marks the port no longer connects to this DirectConnection.
func (c *DirectConnection) Unplug(port Port) {
panic("not implemented")
}
// NotifyAvailable is called by a port to notify that the connection can
// deliver to the port again.
func (c *DirectConnection) NotifyAvailable(now VTimeInSec, port Port) {
c.TickNow(now)
}
// Send of a DirectConnection schedules a DeliveryEvent immediately
func (c *DirectConnection) Send(msg Msg) *SendError {
c.Lock()
defer c.Unlock()
c.msgMustBeValid(msg)
srcEnd := c.ends[msg.Meta().Src]
if len(srcEnd.buf) >= srcEnd.bufSize {
srcEnd.busy = true
return NewSendError()
}
srcEnd.buf = append(srcEnd.buf, msg)
c.TickNow(msg.Meta().SendTime)
return nil
}
func (c *DirectConnection) msgMustBeValid(msg Msg) {
c.portMustNotBeNil(msg.Meta().Src)
c.portMustNotBeNil(msg.Meta().Dst)
c.portMustBeConnected(msg.Meta().Src)
c.portMustBeConnected(msg.Meta().Dst)
c.srcDstMustNotBeTheSame(msg)
}
func (c *DirectConnection) portMustNotBeNil(port Port) {
if port == nil {
panic("src or dst is not given")
}
}
func (c *DirectConnection) portMustBeConnected(port Port) {
if _, connected := c.ends[port]; !connected {
panic("src or dst is not connected")
}
}
func (c *DirectConnection) srcDstMustNotBeTheSame(msg Msg) {
if msg.Meta().Src == msg.Meta().Dst {
panic("sending back to src")
}
}
// Tick updates the states of the connection and delivers messages.
func (c *DirectConnection) Tick(now VTimeInSec) bool {
madeProgress := false
for i := 0; i < len(c.ports); i++ {
portID := (i + c.nextPortID) % len(c.ports)
port := c.ports[portID]
end := c.ends[port]
madeProgress = c.forwardMany(end, now) || madeProgress
}
c.nextPortID = (c.nextPortID + 1) % len(c.ports)
return madeProgress
}
func (c *DirectConnection) forwardMany(
end *directConnectionEnd,
now VTimeInSec,
) bool {
madeProgress := false
for {
if len(end.buf) == 0 {
break
}
head := end.buf[0]
head.Meta().RecvTime = now
err := head.Meta().Dst.Recv(head)
if err != nil {
break
}
madeProgress = true
end.buf = end.buf[1:]
if end.busy {
end.port.NotifyAvailable(now)
end.busy = false
}
}
return madeProgress
}
// NewDirectConnection creates a new DirectConnection object
func NewDirectConnection(
name string,
engine Engine,
freq Freq,
) *DirectConnection {
c := new(DirectConnection)
c.TickingComponent = NewSecondaryTickingComponent(name, engine, freq, c)
c.ends = make(map[Port]*directConnectionEnd)
return c
}