Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gomemif improvemets #228

Merged
merged 3 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 21 additions & 22 deletions extras/gomemif/memif/control_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,17 @@ type controlChannel struct {
// Socket represents a UNIX domain socket used for communication
// between memif peers
type Socket struct {
appName string
filename string
listener *listener
interfaceList *list.List
ccList *list.List
epfd int
interruptfd int
wakeEvent syscall.EpollEvent
stopPollChan chan struct{}
wg sync.WaitGroup
appName string
filename string
intfQueueEventMap map[int]memifIntfQueue
listener *listener
interfaceList *list.List
ccList *list.List
epfd int
interruptfd int
wakeEvent syscall.EpollEvent
stopPollChan chan struct{}
wg sync.WaitGroup
}

type interrupt struct {
Expand All @@ -83,6 +84,11 @@ type memifInterrupt struct {
qid uint16
}

type memifIntfQueue struct {
intf *Interface
qid uint16
}

// StopPolling stops polling events on the socket
func (socket *Socket) StopPolling() error {
if socket.stopPollChan != nil {
Expand Down Expand Up @@ -216,6 +222,7 @@ func NewSocket(appName string, filename string) (socket *Socket, err error) {
socket.filename = DefaultSocketFilename
}

socket.intfQueueEventMap = make(map[int]memifIntfQueue)
socket.epfd, _ = syscall.EpollCreate1(0)

efd, err := eventFd()
Expand All @@ -236,18 +243,10 @@ func (socket *Socket) handleEvent(event *syscall.EpollEvent) error {
if socket.listener != nil && socket.listener.event.Fd == event.Fd {
return socket.listener.handleEvent(event)
}
for elt := socket.interfaceList.Front(); elt != nil; elt = elt.Next() {
intf := elt.Value.(*Interface)
if intf.args.InterruptFunc != nil {
for rx_qid := 0; rx_qid < int(intf.GetMemoryConfig().NumQueuePairs); rx_qid++ {
queue, _ := intf.GetRxQueue(rx_qid)
interruptFd, _ := queue.GetEventFd()
if int(event.Fd) == interruptFd {
intf.onInterrupt(intf, rx_qid)
return nil
}
}
}

intfQueue, found := socket.intfQueueEventMap[int(event.Fd)]
if found {
return intfQueue.intf.onInterrupt(intfQueue.intf, int(intfQueue.qid))
}

for elt := socket.ccList.Front(); elt != nil; elt = elt.Next() {
Expand Down
7 changes: 6 additions & 1 deletion extras/gomemif/memif/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func (i *Interface) connect() (err error) {
q.lastTail = 0
}

for _, q := range i.rxQueues {
for qid, q := range i.rxQueues {
q.updateRing()

if q.ring.getCookie() != cookie {
Expand All @@ -534,6 +534,11 @@ func (i *Interface) connect() (err error) {

q.lastHead = 0
q.lastTail = 0

if i.args.InterruptFunc != nil {
interruptFd, _ := q.GetEventFd()
i.socket.intfQueueEventMap[interruptFd] = memifIntfQueue{i, uint16(qid)}
}
}

return i.args.ConnectedFunc(i)
Expand Down
12 changes: 10 additions & 2 deletions extras/gomemif/memif/packet_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (q *Queue) Rx_burst(pkt []MemifPacketBuffer) (uint16, error) {
var length uint32
var offset uint32
var nSlots uint16
var readQueueInterrupt bool = true
var desc descBuf = newDescBuf()

if q.i.args.IsMaster {
Expand All @@ -124,6 +125,11 @@ func (q *Queue) Rx_burst(pkt []MemifPacketBuffer) (uint16, error) {
return 0, nil
}

if nSlots > uint16(len(pkt)) {
nSlots = uint16(len(pkt))
readQueueInterrupt = false
}

rx := 0
for nSlots > 0 {
// copy descriptor from shm
Expand All @@ -144,8 +150,10 @@ func (q *Queue) Rx_burst(pkt []MemifPacketBuffer) (uint16, error) {

}

b := make([]byte, 8)
syscall.Read(int(q.interruptFd), b)
if readQueueInterrupt {
b := make([]byte, 8)
syscall.Read(int(q.interruptFd), b)
}

return uint16(rx), nil
}
Expand Down
Loading