Skip to content

Commit

Permalink
feat(ocppj): improve closing "stopped" channel when server is stopping
Browse files Browse the repository at this point in the history
  • Loading branch information
dammarco committed Apr 19, 2024
1 parent 2f245ca commit df9248d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 4 deletions.
21 changes: 21 additions & 0 deletions ocppj/central_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strconv"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -54,6 +55,20 @@ func (suite *OcppJTestSuite) TestServerStoppedError() {
assert.Error(t, err, "ocppj server is not started, couldn't send request")
}

func (suite *OcppJTestSuite) TestServerStopBeforeStart() {
t := suite.T()

// Stop server
suite.mockServer.On("Stop").Return(nil)
suite.centralSystem.Stop()

// Start server (should return)
suite.mockServer.On("Start", mock.AnythingOfType("int"), mock.AnythingOfType("string")).Return(nil)
suite.centralSystem.Start(8887, "/{ws}")

assert.True(t, suite.serverDispatcher.IsRunning())
}

// ----------------- SendRequest tests -----------------

func (suite *OcppJTestSuite) TestCentralSystemSendRequest() {
Expand Down Expand Up @@ -749,3 +764,9 @@ func (suite *OcppJTestSuite) TestServerRequestFlow() {
q, _ = suite.serverRequestMap.Get(mockChargePoint2)
assert.True(t, q.IsEmpty())
}

func TestStopBeforeStart(t *testing.T) {

s := ocppj.NewServer(nil, nil, nil)
s.Stop()
}
9 changes: 7 additions & 2 deletions ocppj/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ func NewDefaultServerDispatcher(queueMap ServerQueueMap) *DefaultServerDispatche
requestChannel: nil,
readyForDispatch: make(chan string, 1),
timeout: defaultMessageTimeout,
stoppedC: make(chan struct{}, 1),
}
d.pendingRequestState = NewServerState(&d.mutex)
return d
Expand All @@ -389,7 +390,6 @@ func NewDefaultServerDispatcher(queueMap ServerQueueMap) *DefaultServerDispatche
func (d *DefaultServerDispatcher) Start() {
d.requestChannel = make(chan string, 20)
d.timerC = make(chan string, 10)
d.stoppedC = make(chan struct{}, 1)
d.running = true
go d.messagePump()
}
Expand All @@ -404,7 +404,12 @@ func (d *DefaultServerDispatcher) Stop() {
d.mutex.Lock()
defer d.mutex.Unlock()
d.running = false
close(d.stoppedC)

select {
case <-d.stoppedC:
default:
close(d.stoppedC)
}
}

func (d *DefaultServerDispatcher) SetTimeout(timeout time.Duration) {
Expand Down
9 changes: 7 additions & 2 deletions ocppj/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func NewServer(wsServer ws.WsServer, dispatcher ServerDispatcher, stateHandler S
server: wsServer,
RequestState: stateHandler,
dispatcher: dispatcher,
stopped: make(chan struct{})}
stopped: make(chan struct{}),
}
for _, profile := range profiles {
s.AddProfile(profile)
}
Expand Down Expand Up @@ -148,7 +149,11 @@ func (s *Server) Start(listenPort int, listenPath string) {
// Stops the server.
// This clears all pending requests and causes the Start function to return.
func (s *Server) Stop() {
close(s.stopped)
select {
case <-s.stopped:
default:
close(s.stopped)
}
s.waitGroup.Wait()
s.server.Stop()
s.dispatcher.Stop()
Expand Down

0 comments on commit df9248d

Please sign in to comment.