Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 71 additions & 71 deletions portal-ui/bindata_assetfs.go

Large diffs are not rendered by default.

100 changes: 24 additions & 76 deletions restapi/admin_console.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"log"
"strings"
"sync"
"time"

"github.com/gorilla/websocket"
Expand All @@ -32,97 +31,46 @@ import (
const logTimeFormat string = "15:04:05 MST 01/02/2006"

// startConsoleLog starts log of the servers
// by first setting a websocket reader that will
// check for a heartbeat.
//
// A WaitGroup is used to handle goroutines and to ensure
// all finish in the proper order. If any, sendConsoleLogInfo()
// or wsReadCheck() returns, trace should end.
func startConsoleLog(conn WSConn, client MinioAdmin) (mError error) {
// a WaitGroup waits for a collection of goroutines to finish
wg := sync.WaitGroup{}
// a cancel context is needed to end all goroutines used
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Set number of goroutines to wait. wg.Wait()
// waitsuntil counter is zero (all are done)
wg.Add(3)
// start go routine for reading websocket heartbeat
readErr := wsReadCheck(ctx, &wg, conn)
// send Stream of Console Log Info to the ws c.connection
logCh := sendConsoleLogInfo(ctx, &wg, conn, client)
// If wsReadCheck returns it means that it is not possible to check
// ws heartbeat anymore so we stop from doing Console Log, cancel context
// for all goroutines.
go func(wg *sync.WaitGroup) {
defer wg.Done()
if err := <-readErr; err != nil {
log.Println("error on wsReadCheck:", err)
mError = err
}
// cancel context for all goroutines.
cancel()
}(&wg)

// get logCh err on finish
if err := <-logCh; err != nil {
mError = err
}

// if logCh closes for any reason,
// cancel context for all goroutines
cancel()
// wait all goroutines to finish
wg.Wait()
return mError
}

// sendlogInfo sends stream of Console Log Info to the ws connection
func sendConsoleLogInfo(ctx context.Context, wg *sync.WaitGroup, conn WSConn, client MinioAdmin) <-chan error {
// decrements the WaitGroup counter
// by one when the function returns
defer wg.Done()
ch := make(chan error)
go func(ch chan<- error) {
defer close(ch)

// TODO: accept parameters as variables
// name of node, default = "" (all)
node := ""
// number of log lines
lineCount := 100
// type of logs "minio"|"application"|"all" default = "all"
logKind := "all"
// Start listening on all Console Log activity.
logCh := client.getLogs(ctx, node, lineCount, logKind)

for logInfo := range logCh {
func startConsoleLog(ctx context.Context, conn WSConn, client MinioAdmin) error {
// TODO: accept parameters as variables
// name of node, default = "" (all)
node := ""
// number of log lines
lineCount := 100
// type of logs "minio"|"application"|"all" default = "all"
logKind := "all"
// Start listening on all Console Log activity.
logCh := client.getLogs(ctx, node, lineCount, logKind)

for {
select {
case <-ctx.Done():
return nil
case logInfo, ok := <-logCh:
// zero value returned because the channel is closed and empty
if !ok {
return nil
}
if logInfo.Err != nil {
log.Println("error on console logs:", logInfo.Err)
ch <- logInfo.Err
return
return logInfo.Err
}

// Serialize message to be sent
bytes, err := json.Marshal(serializeConsoleLogInfo(&logInfo))
if err != nil {
fmt.Println("error on json.Marshal:", err)
ch <- err
return
return err
}

// Send Message through websocket connection
err = conn.writeMessage(websocket.TextMessage, bytes)
if err != nil {
log.Println("error writeMessage:", err)
ch <- err
return
return err
}
}
}(ch)

return ch
}
}

func serializeConsoleLogInfo(l *madmin.LogInfo) (logInfo madmin.LogInfo) {
Expand Down
68 changes: 13 additions & 55 deletions restapi/admin_console_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"testing"

"github.com/gorilla/websocket"
"github.com/minio/minio/pkg/madmin"
"github.com/stretchr/testify/assert"
)
Expand All @@ -39,11 +38,13 @@ func TestAdminConsoleLog(t *testing.T) {
assert := assert.New(t)
adminClient := adminClientMock{}
mockWSConn := mockConn{}
function := "startConsoleLog()"
function := "startConsoleLog(ctx, )"
ctx := context.Background()

testReceiver := make(chan madmin.LogInfo, 5)
textToReceive := "test message"
testStreamSize := 5
isClosed := false // testReceiver is closed?

// Test-1: Serve Console with no errors until Console finishes sending
// define mock function behavior for minio server Console
Expand All @@ -63,26 +64,24 @@ func TestAdminConsoleLog(t *testing.T) {
}(ch)
return ch
}
// mock function of conn.ReadMessage(), no error on read
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, nil
}
writesCount := 1
// mock connection WriteMessage() no error
connWriteMessageMock = func(messageType int, data []byte) error {
// emulate that receiver gets the message written
var t madmin.LogInfo
_ = json.Unmarshal(data, &t)
if writesCount == testStreamSize {
// for testing we need to close the receiver channel
close(testReceiver)
if !isClosed {
close(testReceiver)
isClosed = true
}
return nil
}
testReceiver <- t
writesCount++
return nil
}
if err := startConsoleLog(mockWSConn, adminClient); err != nil {
if err := startConsoleLog(ctx, mockWSConn, adminClient); err != nil {
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
}
// check that the TestReceiver got the same number of data from Console.
Expand All @@ -94,50 +93,11 @@ func TestAdminConsoleLog(t *testing.T) {
connWriteMessageMock = func(messageType int, data []byte) error {
return fmt.Errorf("error on write")
}
if err := startConsoleLog(mockWSConn, adminClient); assert.Error(err) {
if err := startConsoleLog(ctx, mockWSConn, adminClient); assert.Error(err) {
assert.Equal("error on write", err.Error())
}

// Test-3: error happens while reading, unexpected Close Error should return error.
connWriteMessageMock = func(messageType int, data []byte) error {
return nil
}
// mock function of conn.ReadMessage(), returns unexpected Close Error CloseAbnormalClosure
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseAbnormalClosure, Text: ""}
}
if err := startConsoleLog(mockWSConn, adminClient); assert.Error(err) {
assert.Equal("websocket: close 1006 (abnormal closure)", err.Error())
}

// Test-4: error happens while reading, expected Close Error NormalClosure
// expected Close Error should not return an error, just end Console.
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseNormalClosure, Text: ""}
}
if err := startConsoleLog(mockWSConn, adminClient); err != nil {
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
}

// Test-5: error happens while reading, expected Close Error CloseGoingAway
// expected Close Error should not return an error, just return.
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, &websocket.CloseError{Code: websocket.CloseGoingAway, Text: ""}
}
if err := startConsoleLog(mockWSConn, adminClient); err != nil {
t.Errorf("Failed on %s:, error occurred: %s", function, err.Error())
}

// Test-6: error happens while reading, non Close Error Type should be returned as
// error
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, fmt.Errorf("error on read")
}
if err := startConsoleLog(mockWSConn, adminClient); assert.Error(err) {
assert.Equal("error on read", err.Error())
}

// Test-7: error happens on GetLogs Minio, Console should stop
// Test-3: error happens on GetLogs Minio, Console should stop
// and error shall be returned.
minioGetLogsMock = func(ctx context.Context, node string, lineCnt int, logKind string) <-chan madmin.LogInfo {
ch := make(chan madmin.LogInfo)
Expand All @@ -156,12 +116,10 @@ func TestAdminConsoleLog(t *testing.T) {
}(ch)
return ch
}
// mock function of conn.ReadMessage(), no error on read, should stay unless
// context is done.
connReadMessageMock = func() (messageType int, p []byte, err error) {
return 0, []byte{}, nil
connWriteMessageMock = func(messageType int, data []byte) error {
return nil
}
if err := startConsoleLog(mockWSConn, adminClient); assert.Error(err) {
if err := startConsoleLog(ctx, mockWSConn, adminClient); assert.Error(err) {
assert.Equal("error on Console", err.Error())
}
}
94 changes: 20 additions & 74 deletions restapi/admin_trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"log"
"net/http"
"strings"
"sync"

"github.com/gorilla/websocket"
"github.com/minio/minio/pkg/madmin"
Expand All @@ -50,93 +49,40 @@ type callStats struct {
}

// startTraceInfo starts trace of the servers
// by first setting a websocket reader that will
// check for a heartbeat.
//
// A WaitGroup is used to handle goroutines and to ensure
// all finish in the proper order. If any, sendTraceInfo()
// or wsReadCheck() returns, trace should end.
func startTraceInfo(conn WSConn, client MinioAdmin) (mError error) {
// a WaitGroup waits for a collection of goroutines to finish
wg := sync.WaitGroup{}
// a cancel context is needed to end all goroutines used
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Set number of goroutines to wait. wg.Wait()
// waitsuntil counter is zero (all are done)
wg.Add(3)
// start go routine for reading websocket heartbeat
readErr := wsReadCheck(ctx, &wg, conn)
// send Stream of Trace Info to the ws c.connection
traceCh := sendTraceInfo(ctx, &wg, conn, client)
// If wsReadCheck returns it means that it is not possible to check
// ws heartbeat anymore so we stop from doing trace, cancel context
// for all goroutines.
go func(wg *sync.WaitGroup) {
defer wg.Done()
if err := <-readErr; err != nil {
log.Println("error on wsReadCheck:", err)
mError = err
}
// cancel context for all goroutines.
cancel()
}(&wg)

// get traceCh error on finish
if err := <-traceCh; err != nil {
mError = err
}

// if traceCh closes for any reason,
// cancel context for all goroutines
cancel()
// wait all goroutines to finish
wg.Wait()
return mError
}

// sendTraceInfo sends stream of Trace Info to the ws connection
func sendTraceInfo(ctx context.Context, wg *sync.WaitGroup, conn WSConn, client MinioAdmin) <-chan error {
// decrements the WaitGroup counter
// by one when the function returns
defer wg.Done()
ch := make(chan error)
go func(ch chan<- error) {
defer close(ch)

// trace all traffic
allTraffic := true
// Trace failed requests only
errOnly := false
// Start listening on all trace activity.
traceCh := client.serviceTrace(ctx, allTraffic, errOnly)

for traceInfo := range traceCh {
func startTraceInfo(ctx context.Context, conn WSConn, client MinioAdmin) error {
// trace all traffic
allTraffic := true
// Trace failed requests only
errOnly := false
// Start listening on all trace activity.
traceCh := client.serviceTrace(ctx, allTraffic, errOnly)
for {
select {
case <-ctx.Done():
return nil
case traceInfo, ok := <-traceCh:
// zero value returned because the channel is closed and empty
if !ok {
return nil
}
if traceInfo.Err != nil {
log.Println("error on serviceTrace:", traceInfo.Err)
ch <- traceInfo.Err
return
return traceInfo.Err
}
// Serialize message to be sent
traceInfoBytes, err := json.Marshal(shortTrace(&traceInfo))
if err != nil {
fmt.Println("error on json.Marshal:", err)
ch <- err
return
return err
}
// Send Message through websocket connection
err = conn.writeMessage(websocket.TextMessage, traceInfoBytes)
if err != nil {
log.Println("error writeMessage:", err)
ch <- err
return
return err
}
}
// TODO: verbose
}(ch)

return ch
}
}

// shortTrace creates a shorter Trace Info message.
Expand Down
Loading