Skip to content

Commit

Permalink
streamlog: make generic (#12494)
Browse files Browse the repository at this point in the history
* streamlog: make generic

Signed-off-by: Vicent Marti <vmg@strn.cat>

* gofmt

Signed-off-by: Vicent Marti <vmg@strn.cat>

---------

Signed-off-by: Vicent Marti <vmg@strn.cat>
  • Loading branch information
vmg authored Feb 28, 2023
1 parent 607a9a4 commit c04989b
Show file tree
Hide file tree
Showing 19 changed files with 77 additions and 195 deletions.
26 changes: 13 additions & 13 deletions go/streamlog/streamlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ const (

// StreamLogger is a non-blocking broadcaster of messages.
// Subscribers can use channels or HTTP.
type StreamLogger struct {
type StreamLogger[T any] struct {
name string
size int
mu sync.Mutex
subscribed map[chan any]string
subscribed map[chan T]string
}

// LogFormatter is the function signature used to format an arbitrary
Expand All @@ -131,17 +131,17 @@ type LogFormatter func(out io.Writer, params url.Values, message any) error

// New returns a new StreamLogger that can stream events to subscribers.
// The size parameter defines the channel size for the subscribers.
func New(name string, size int) *StreamLogger {
return &StreamLogger{
func New[T any](name string, size int) *StreamLogger[T] {
return &StreamLogger[T]{
name: name,
size: size,
subscribed: make(map[chan any]string),
subscribed: make(map[chan T]string),
}
}

// Send sends message to all the writers subscribed to logger. Calling
// Send does not block.
func (logger *StreamLogger) Send(message any) {
func (logger *StreamLogger[T]) Send(message T) {
logger.mu.Lock()
defer logger.mu.Unlock()

Expand All @@ -158,31 +158,31 @@ func (logger *StreamLogger) Send(message any) {

// Subscribe returns a channel which can be used to listen
// for messages.
func (logger *StreamLogger) Subscribe(name string) chan any {
func (logger *StreamLogger[T]) Subscribe(name string) chan T {
logger.mu.Lock()
defer logger.mu.Unlock()

ch := make(chan any, logger.size)
ch := make(chan T, logger.size)
logger.subscribed[ch] = name
return ch
}

// Unsubscribe removes the channel from the subscription.
func (logger *StreamLogger) Unsubscribe(ch chan any) {
func (logger *StreamLogger[T]) Unsubscribe(ch chan T) {
logger.mu.Lock()
defer logger.mu.Unlock()

delete(logger.subscribed, ch)
}

// Name returns the name of StreamLogger.
func (logger *StreamLogger) Name() string {
func (logger *StreamLogger[T]) Name() string {
return logger.name
}

// ServeLogs registers the URL on which messages will be broadcast.
// It is safe to register multiple URLs for the same StreamLogger.
func (logger *StreamLogger) ServeLogs(url string, logf LogFormatter) {
func (logger *StreamLogger[T]) ServeLogs(url string, logf LogFormatter) {
http.HandleFunc(url, func(w http.ResponseWriter, r *http.Request) {
if err := acl.CheckAccessHTTP(r, acl.DEBUGGING); err != nil {
acl.SendError(w, err)
Expand Down Expand Up @@ -213,7 +213,7 @@ func (logger *StreamLogger) ServeLogs(url string, logf LogFormatter) {
//
// Returns the channel used for the subscription which can be used to close
// it.
func (logger *StreamLogger) LogToFile(path string, logf LogFormatter) (chan any, error) {
func (logger *StreamLogger[T]) LogToFile(path string, logf LogFormatter) (chan T, error) {
rotateChan := make(chan os.Signal, 1)
signal.Notify(rotateChan, syscall.SIGUSR2)

Expand Down Expand Up @@ -248,7 +248,7 @@ type Formatter interface {

// GetFormatter returns a formatter function for objects conforming to the
// Formatter interface
func GetFormatter(logger *StreamLogger) LogFormatter {
func GetFormatter[T any](logger *StreamLogger[T]) LogFormatter {
return func(w io.Writer, params url.Values, val any) error {
fmter, ok := val.(Formatter)
if !ok {
Expand Down
10 changes: 5 additions & 5 deletions go/streamlog/streamlog_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestHTTP(t *testing.T) {

go http.Serve(l, nil)

logger := New("logger", 1)
logger := New[*logMessage]("logger", 1)
logger.ServeLogs("/log", testLogf)

// This should not block - there are no subscribers yet.
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestHTTP(t *testing.T) {
}

func TestChannel(t *testing.T) {
logger := New("logger", 1)
logger := New[*logMessage]("logger", 1)

// Subscribe.
ch := logger.Subscribe("test")
Expand All @@ -140,7 +140,7 @@ func TestChannel(t *testing.T) {
msg := fmt.Sprint("msg", i)
done := make(chan struct{})
go func() {
if want, got := msg+"\n", (<-ch).(*logMessage).Format(nil); got != want {
if want, got := msg+"\n", (<-ch).Format(nil); got != want {
t.Errorf("Unexpected message in log. got: %q, want: %q", got, want)
}
close(done)
Expand All @@ -158,7 +158,7 @@ func TestChannel(t *testing.T) {
for {
select {
case msg := <-ch:
got = append(got, msg.(*logMessage).Format(nil))
got = append(got, msg.Format(nil))
case <-writeDone:
close(readDone)
return
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestChannel(t *testing.T) {
}

func TestFile(t *testing.T) {
logger := New("logger", 10)
logger := New[*logMessage]("logger", 10)

dir := t.TempDir()

Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sort"
"strings"

"vitess.io/vitess/go/vt/vtgate/logstats"
"vitess.io/vitess/go/vt/vtgate/vindexes"

"vitess.io/vitess/go/cache"
Expand Down Expand Up @@ -75,7 +76,7 @@ func (vte *VTExplain) initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts
vte.vtgateExecutor = vtgate.NewExecutor(context.Background(), vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, cache.DefaultConfig, schemaTracker, false, opts.PlannerVersion)

queryLogBufferSize := 10
vtgate.SetQueryLogger(streamlog.New("VTGate", queryLogBufferSize))
vtgate.SetQueryLogger(streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize))

return nil
}
Expand Down
10 changes: 4 additions & 6 deletions go/vt/vtgate/executor_framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,10 @@ func testNonZeroDuration(t *testing.T, what, d string) {
}
}

func getQueryLog(logChan chan any) *logstats.LogStats {
var log any

func getQueryLog(logChan chan *logstats.LogStats) *logstats.LogStats {
select {
case log = <-logChan:
return log.(*logstats.LogStats)
case log := <-logChan:
return log
default:
return nil
}
Expand All @@ -346,7 +344,7 @@ func getQueryLog(logChan chan any) *logstats.LogStats {
// is a repeat query.
var testPlannedQueries = map[string]bool{}

func testQueryLog(t *testing.T, logChan chan any, method, stmtType, sql string, shardQueries int) *logstats.LogStats {
func testQueryLog(t *testing.T, logChan chan *logstats.LogStats, method, stmtType, sql string, shardQueries int) *logstats.LogStats {
t.Helper()

logStats := getQueryLog(logChan)
Expand Down
7 changes: 4 additions & 3 deletions go/vt/vtgate/querylog.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"

"vitess.io/vitess/go/streamlog"
"vitess.io/vitess/go/vt/vtgate/logstats"
)

var (
Expand All @@ -34,18 +35,18 @@ var (
QueryzHandler = "/debug/queryz"

// QueryLogger enables streaming logging of queries
QueryLogger *streamlog.StreamLogger
QueryLogger *streamlog.StreamLogger[*logstats.LogStats]
queryLoggerMu sync.Mutex
)

func SetQueryLogger(logger *streamlog.StreamLogger) {
func SetQueryLogger(logger *streamlog.StreamLogger[*logstats.LogStats]) {
queryLoggerMu.Lock()
defer queryLoggerMu.Unlock()
QueryLogger = logger
}

func initQueryLogger(vtg *VTGate) error {
SetQueryLogger(streamlog.New("VTGate", queryLogBufferSize))
SetQueryLogger(streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize))
QueryLogger.ServeLogs(QueryLogHandler, streamlog.GetFormatter(QueryLogger))

http.HandleFunc(QueryLogzHandler, func(w http.ResponseWriter, r *http.Request) {
Expand Down
15 changes: 2 additions & 13 deletions go/vt/vtgate/querylogz.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package vtgate

import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
Expand Down Expand Up @@ -86,7 +84,7 @@ var (

// querylogzHandler serves a human readable snapshot of the
// current query log.
func querylogzHandler(ch chan any, w http.ResponseWriter, r *http.Request) {
func querylogzHandler(ch chan *logstats.LogStats, w http.ResponseWriter, r *http.Request) {
if err := acl.CheckAccessHTTP(r, acl.DEBUGGING); err != nil {
acl.SendError(w, err)
return
Expand All @@ -100,21 +98,12 @@ func querylogzHandler(ch chan any, w http.ResponseWriter, r *http.Request) {
defer tmr.Stop()
for i := 0; i < limit; i++ {
select {
case out := <-ch:
case stats := <-ch:
select {
case <-tmr.C:
return
default:
}
stats, ok := out.(*logstats.LogStats)
if !ok {
err := fmt.Errorf("unexpected value in %s: %#v (expecting value of type %T)", QueryLogger.Name(), out, &logstats.LogStats{})
_, _ = io.WriteString(w, `<tr class="error">`)
_, _ = io.WriteString(w, err.Error())
_, _ = io.WriteString(w, "</tr>")
log.Error(err)
continue
}
var level string
if stats.TotalTime().Seconds() < 0.01 {
level = "low"
Expand Down
23 changes: 5 additions & 18 deletions go/vt/vtgate/querylogz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package vtgate

import (
"context"
"io"
"net/http"
"net/http/httptest"
Expand All @@ -27,24 +28,10 @@ import (

"vitess.io/vitess/go/vt/vtgate/logstats"

"context"

"vitess.io/vitess/go/streamlog"
"vitess.io/vitess/go/vt/callerid"
)

func TestQuerylogzHandlerInvalidLogStats(t *testing.T) {
req, _ := http.NewRequest("GET", "/querylogz?timeout=10&limit=1", nil)
response := httptest.NewRecorder()
ch := make(chan any, 1)
ch <- "test msg"
querylogzHandler(ch, response, req)
close(ch)
if !strings.Contains(response.Body.String(), "error") {
t.Fatalf("should show an error page for an non LogStats")
}
}

func TestQuerylogzHandlerFormatting(t *testing.T) {
req, _ := http.NewRequest("GET", "/querylogz?timeout=10&limit=1", nil)
logStats := logstats.NewLogStats(context.Background(), "Execute", "select name from test_table limit 1000", "suuid", nil)
Expand Down Expand Up @@ -84,7 +71,7 @@ func TestQuerylogzHandlerFormatting(t *testing.T) {
}
logStats.EndTime = logStats.StartTime.Add(1 * time.Millisecond)
response := httptest.NewRecorder()
ch := make(chan any, 1)
ch := make(chan *logstats.LogStats, 1)
ch <- logStats
querylogzHandler(ch, response, req)
close(ch)
Expand Down Expand Up @@ -114,7 +101,7 @@ func TestQuerylogzHandlerFormatting(t *testing.T) {
}
logStats.EndTime = logStats.StartTime.Add(20 * time.Millisecond)
response = httptest.NewRecorder()
ch = make(chan any, 1)
ch = make(chan *logstats.LogStats, 1)
ch <- logStats
querylogzHandler(ch, response, req)
close(ch)
Expand Down Expand Up @@ -143,7 +130,7 @@ func TestQuerylogzHandlerFormatting(t *testing.T) {
`</tr>`,
}
logStats.EndTime = logStats.StartTime.Add(500 * time.Millisecond)
ch = make(chan any, 1)
ch = make(chan *logstats.LogStats, 1)
ch <- logStats
querylogzHandler(ch, response, req)
close(ch)
Expand All @@ -153,7 +140,7 @@ func TestQuerylogzHandlerFormatting(t *testing.T) {
// ensure querylogz is not affected by the filter tag
streamlog.SetQueryLogFilterTag("XXX_SKIP_ME")
defer func() { streamlog.SetQueryLogFilterTag("") }()
ch = make(chan any, 1)
ch = make(chan *logstats.LogStats, 1)
ch <- logStats
querylogzHandler(ch, response, req)
close(ch)
Expand Down
Loading

0 comments on commit c04989b

Please sign in to comment.