Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix golint issues in core/stat #515

Merged
merged 2 commits into from
Feb 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions core/service/servicegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,29 @@ type (
Stopper
}

// A Group is a group of services.
Group struct {
// A ServiceGroup is a group of services.
ServiceGroup struct {
services []Service
stopOnce func()
}
)

// NewGroup returns a Group.
func NewGroup() *Group {
sg := new(Group)
// NewServiceGroup returns a ServiceGroup.
func NewServiceGroup() *ServiceGroup {
sg := new(ServiceGroup)
sg.stopOnce = syncx.Once(sg.doStop)
return sg
}

// Add adds service into sg.
func (sg *Group) Add(service Service) {
func (sg *ServiceGroup) Add(service Service) {
sg.services = append(sg.services, service)
}

// Start starts the Group.
// Start starts the ServiceGroup.
// There should not be any logic code after calling this method, because this method is a blocking one.
// Also, quitting this method will close the logx output.
func (sg *Group) Start() {
func (sg *ServiceGroup) Start() {
proc.AddShutdownListener(func() {
log.Println("Shutting down...")
sg.stopOnce()
Expand All @@ -56,12 +56,12 @@ func (sg *Group) Start() {
sg.doStart()
}

// Stop stops the Group.
func (sg *Group) Stop() {
// Stop stops the ServiceGroup.
func (sg *ServiceGroup) Stop() {
sg.stopOnce()
}

func (sg *Group) doStart() {
func (sg *ServiceGroup) doStart() {
routineGroup := threading.NewRoutineGroup()

for i := range sg.services {
Expand All @@ -74,7 +74,7 @@ func (sg *Group) doStart() {
routineGroup.Wait()
}

func (sg *Group) doStop() {
func (sg *ServiceGroup) doStop() {
for _, service := range sg.services {
service.Stop()
}
Expand Down
6 changes: 3 additions & 3 deletions core/service/servicegroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestServiceGroup(t *testing.T) {
multipliers := []int{2, 3, 5, 7}
want := 1

group := NewGroup()
group := NewServiceGroup()
for _, multiplier := range multipliers {
want *= multiplier
service := newMockedService(multiplier)
Expand All @@ -68,7 +68,7 @@ func TestServiceGroup_WithStart(t *testing.T) {
var wait sync.WaitGroup
var lock sync.Mutex
wait.Add(len(multipliers))
group := NewGroup()
group := NewServiceGroup()
for _, multiplier := range multipliers {
var mul = multiplier
group.Add(WithStart(func() {
Expand All @@ -95,7 +95,7 @@ func TestServiceGroup_WithStarter(t *testing.T) {
var wait sync.WaitGroup
var lock sync.Mutex
wait.Add(len(multipliers))
group := NewGroup()
group := NewServiceGroup()
for _, multiplier := range multipliers {
var mul = multiplier
group.Add(WithStarter(mockedStarter{
Expand Down
2 changes: 2 additions & 0 deletions core/stat/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func init() {
}
}

// Report reports given message.
func Report(msg string) {
lock.RLock()
fn := reporter
Expand All @@ -63,6 +64,7 @@ func Report(msg string) {
}
}

// SetReporter sets the given reporter.
func SetReporter(fn func(string)) {
lock.Lock()
defer lock.Unlock()
Expand Down
2 changes: 2 additions & 0 deletions core/stat/alert_polyfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

package stat

// Report reports given message.
func Report(string) {
}

// SetReporter sets the given reporter.
func SetReporter(func(string)) {
}
32 changes: 24 additions & 8 deletions core/stat/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,23 @@ import (

"github.com/tal-tech/go-zero/core/executors"
"github.com/tal-tech/go-zero/core/logx"
"github.com/tal-tech/go-zero/core/syncx"
)

var (
LogInterval = time.Minute

logInterval = time.Minute
writerLock sync.Mutex
reportWriter Writer = nil
logEnabled = syncx.ForAtomicBool(true)
)

type (
// Writer interface wraps the Write method.
Writer interface {
Write(report *StatReport) error
}

// A StatReport is a stat report entry.
StatReport struct {
Name string `json:"name"`
Timestamp int64 `json:"tm"`
Expand All @@ -34,40 +37,51 @@ type (
Top99p9th float32 `json:"t99p9"`
}

// A Metrics is used to log and report stat reports.
Metrics struct {
executor *executors.PeriodicalExecutor
container *metricsContainer
}
)

// DisableLog disables logs of stats.
func DisableLog() {
logEnabled.Set(false)
}

// SetReportWriter sets the report writer.
func SetReportWriter(writer Writer) {
writerLock.Lock()
reportWriter = writer
writerLock.Unlock()
}

// NewMetrics returns a Metrics.
func NewMetrics(name string) *Metrics {
container := &metricsContainer{
name: name,
pid: os.Getpid(),
}

return &Metrics{
executor: executors.NewPeriodicalExecutor(LogInterval, container),
executor: executors.NewPeriodicalExecutor(logInterval, container),
container: container,
}
}

// Add adds task to m.
func (m *Metrics) Add(task Task) {
m.executor.Add(task)
}

// AddDrop adds a drop to m.
func (m *Metrics) AddDrop() {
m.executor.Add(Task{
Drop: true,
})
}

// SetName sets the name of m.
func (m *Metrics) SetName(name string) {
m.executor.Sync(func() {
m.container.name = name
Expand Down Expand Up @@ -113,7 +127,7 @@ func (c *metricsContainer) Execute(v interface{}) {
Name: c.name,
Timestamp: time.Now().Unix(),
Pid: c.pid,
ReqsPerSecond: float32(size) / float32(LogInterval/time.Second),
ReqsPerSecond: float32(size) / float32(logInterval/time.Second),
Drops: drops,
}

Expand Down Expand Up @@ -192,10 +206,12 @@ func getTopDuration(tasks []Task) float32 {

func log(report *StatReport) {
writeReport(report)
logx.Statf("(%s) - qps: %.1f/s, drops: %d, avg time: %.1fms, med: %.1fms, "+
"90th: %.1fms, 99th: %.1fms, 99.9th: %.1fms",
report.Name, report.ReqsPerSecond, report.Drops, report.Average, report.Median,
report.Top90th, report.Top99th, report.Top99p9th)
if logEnabled.True() {
logx.Statf("(%s) - qps: %.1f/s, drops: %d, avg time: %.1fms, med: %.1fms, "+
"90th: %.1fms, 99th: %.1fms, 99.9th: %.1fms",
report.Name, report.ReqsPerSecond, report.Drops, report.Average, report.Median,
report.Top90th, report.Top99th, report.Top99p9th)
}
}

func writeReport(report *StatReport) {
Expand Down
5 changes: 5 additions & 0 deletions core/stat/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/tal-tech/go-zero/core/logx"
)

func TestMetrics(t *testing.T) {
logx.Disable()
DisableLog()
defer logEnabled.Set(true)

counts := []int{1, 5, 10, 100, 1000, 1000}
for _, count := range counts {
m := NewMetrics("foo")
Expand Down
3 changes: 3 additions & 0 deletions core/stat/remotewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@ import (

const httpTimeout = time.Second * 5

// ErrWriteFailed is an error that indicates failed to submit a StatReport.
var ErrWriteFailed = errors.New("submit failed")

// A RemoteWriter is a writer to write StatReport.
type RemoteWriter struct {
endpoint string
}

// NewRemoteWriter returns a RemoteWriter.
func NewRemoteWriter(endpoint string) Writer {
return &RemoteWriter{
endpoint: endpoint,
Expand Down
1 change: 1 addition & 0 deletions core/stat/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stat

import "time"

// A Task is a task that is reported to Metrics.
type Task struct {
Drop bool
Duration time.Duration
Expand Down
1 change: 1 addition & 0 deletions core/stat/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func init() {
}()
}

// CpuUsage returns current cpu usage.
func CpuUsage() int64 {
return atomic.LoadInt64(&cpuUsage)
}
Expand Down