Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #102] 修复 circuitbreaker 熔断指标上报 prometheus 出现 nil 导致 panic #104

Merged
merged 8 commits into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/circuitbreaker/consumer/polaris.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
global:
serverConnector:
addresses:
- 172.18.0.1:8091
- 127.0.0.1:8091
statReporter:
enable: true
chain:
Expand Down
2 changes: 1 addition & 1 deletion examples/circuitbreaker/provider/polaris.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
global:
serverConnector:
addresses:
- 172.18.0.1:8091
- 127.0.0.1:8091
statReporter:
enable: true
chain:
Expand Down
17 changes: 0 additions & 17 deletions examples/quickstart/provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"os/signal"
"strings"
"syscall"
"time"

"github.com/polarismesh/polaris-go"
)
Expand Down Expand Up @@ -121,22 +120,6 @@ func (svr *PolarisProvider) deregisterService() {
log.Printf("deregister successfully.")
}

func (svr *PolarisProvider) doHeartbeat() {
log.Printf("start to invoke heartbeat operation")
ticker := time.NewTicker(time.Duration(5 * time.Second))
for range ticker.C {
if !svr.isShutdown {
heartbeatRequest := &polaris.InstanceHeartbeatRequest{}
heartbeatRequest.Namespace = namespace
heartbeatRequest.Service = service
heartbeatRequest.Host = svr.host
heartbeatRequest.Port = svr.port
heartbeatRequest.ServiceToken = token
svr.provider.Heartbeat(heartbeatRequest)
}
}
}

func (svr *PolarisProvider) runMainLoop() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, []os.Signal{
Expand Down
2 changes: 1 addition & 1 deletion examples/ratelimit/consumer/polaris.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
global:
serverConnector:
addresses:
- 9.134.5.52:8091
- 127.0.0.1:8091
statReporter:
enable: true
chain:
Expand Down
Binary file modified examples/ratelimit/image/create_service_ratelimit.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion examples/ratelimit/provider/polaris.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
global:
serverConnector:
addresses:
- 9.134.5.52:8091
- 127.0.0.1:8091
statReporter:
enable: true
chain:
Expand Down
22 changes: 11 additions & 11 deletions examples/route/dynamic/README-zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,16 @@ global:

```
# linux/mac运行命令
./provider --metadata="env=dev" > provider-20000.log 2>&1 &
./provider --metadata="env=test" > provider-20001.log 2>&1 &
./provider --metadata="env=pre" > provider-20002.log 2>&1 &
./provider --metadata="env=prod" > provider-20003.log 2>&1 &
./provider --port="20000" --metadata="env=dev" > provider-20000.log 2>&1 &
./provider --port="20001" --metadata="env=test" > provider-20001.log 2>&1 &
./provider --port="20002" --metadata="env=pre" > provider-20002.log 2>&1 &
./provider --port="20003" --metadata="env=prod" > provider-20003.log 2>&1 &

# windows运行命令
./provider.exe --metadata="env=dev" > provider-20000.log
./provider.exe --metadata="env=test" > provider-20001.log
./provider.exe --metadata="env=pre" > provider-20002.log
./provider.exe --metadata="env=prod" > provider-20003.log
./provider.exe --port="20000" --metadata="env=dev" > provider-20000.log
./provider.exe --port="20001" --metadata="env=test" > provider-20001.log
./provider.exe --port="20002" --metadata="env=pre" > provider-20002.log
./provider.exe --port="20003" --metadata="env=prod" > provider-20003.log
```

运行构建出的**consumer**可执行文件
Expand All @@ -76,18 +76,18 @@ global:

```
# linux/mac运行命令
./consumer --selfNamespace={selfName} --selfService=EchoConsumer
./consumer

# windows运行命令
./consumer.exe --selfNamespace={selfName} --selfService=EchoConsumer
./consumer.exe
```

### 验证

通过设置请求头参数***env***的值,实现路由到不同的服务实例

```
curl -H 'env: pre' http://127.0.0.1:18080/echo
curl http://127.0.0.1:18080/echo?env=pre

Hello, I'm RouteEchoServer Provider, My metadata's : env=pre, host : x.x.x.x:x
```
6 changes: 3 additions & 3 deletions examples/route/dynamic/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,18 @@ Run the built **consumer** executable

```
# linux/mac
./consumer --selfNamespace={selfName} --selfService=EchoConsumer
./consumer

# windows
./consumer.exe --selfNamespace={selfName} --selfService=EchoConsumer
./consumer.exe
```

### Verify

Realize the route to different service instances by setting the value of the request header **env**

```
curl -H 'env: pre' http://127.0.0.1:18080/echo
curl http://127.0.0.1:18080/echo?env=pre

Hello, I'm RouteEchoServer Provider, My metadata's : env=pre, host : x.x.x.x:x
```
40 changes: 40 additions & 0 deletions examples/route/dynamic/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"strings"
"time"
Expand All @@ -37,6 +38,7 @@ var (
selfNamespace string
selfService string
port int64
token string
)

func initArgs() {
Expand All @@ -45,18 +47,42 @@ func initArgs() {
flag.StringVar(&selfNamespace, "selfNamespace", "default", "selfNamespace")
flag.StringVar(&selfService, "selfService", "", "selfService")
flag.Int64Var(&port, "port", 18080, "port")
flag.StringVar(&token, "token", "", "token")
}

// PolarisConsumer .
type PolarisConsumer struct {
consumer polaris.ConsumerAPI
router polaris.RouterAPI
provider polaris.ProviderAPI
namespace string
service string
}

// Run .
func (svr *PolarisConsumer) Run() {
if selfService != "" && selfNamespace != "" {
tmpHost, err := getLocalHost(svr.provider.SDKContext().GetConfig().GetGlobal().GetServerConnector().GetAddresses()[0])
if nil != err {
panic(fmt.Errorf("error occur while fetching localhost: %v", err))
}
req := &polaris.InstanceRegisterRequest{}
req.Namespace = selfNamespace
req.Service = selfService
log.Printf("start to invoke register operation")
registerRequest := &polaris.InstanceRegisterRequest{}
registerRequest.Service = service
registerRequest.Namespace = namespace
registerRequest.Host = tmpHost
registerRequest.Port = int(port)
registerRequest.ServiceToken = token
resp, err := svr.provider.RegisterInstance(registerRequest)
if nil != err {
log.Fatalf("fail to register instance, err is %v", err)
}
log.Printf("register response: instanceId %s", resp.InstanceID)
}

svr.runWebServer()
}

Expand Down Expand Up @@ -155,6 +181,7 @@ func main() {
svr := &PolarisConsumer{
consumer: polaris.NewConsumerAPIByContext(sdkCtx),
router: polaris.NewRouterAPIByContext(sdkCtx),
provider: polaris.NewProviderAPIByContext(sdkCtx),
namespace: namespace,
service: service,
}
Expand All @@ -177,3 +204,16 @@ func convertQuery(rawQuery string) map[string]string {
}
return meta
}

func getLocalHost(serverAddr string) (string, error) {
conn, err := net.Dial("tcp", serverAddr)
if nil != err {
return "", err
}
localAddr := conn.LocalAddr().String()
colonIdx := strings.LastIndex(localAddr, ":")
if colonIdx > 0 {
return localAddr[:colonIdx], nil
}
return localAddr, nil
}
32 changes: 16 additions & 16 deletions examples/route/dynamic/provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,22 @@ func (svr *PolarisProvider) Run() {
runMainLoop()
}

func (svr *PolarisProvider) registerService() {
log.Printf("start to invoke register operation")
registerRequest := &polaris.InstanceRegisterRequest{}
registerRequest.Service = service
registerRequest.Namespace = namespace
registerRequest.Host = host
registerRequest.Port = svr.port
registerRequest.ServiceToken = token
registerRequest.Metadata = convertMetadatas()
resp, err := svr.provider.RegisterInstance(registerRequest)
if nil != err {
log.Fatalf("fail to register instance, err is %v", err)
}
log.Printf("register response: instanceId %s", resp.InstanceID)
}

func (svr *PolarisProvider) runWebServer() {
http.HandleFunc("/echo", func(rw http.ResponseWriter, r *http.Request) {
rw.WriteHeader(http.StatusOK)
Expand All @@ -112,22 +128,6 @@ func (svr *PolarisProvider) runWebServer() {

}

func (svr *PolarisProvider) registerService() {
log.Printf("start to invoke register operation")
registerRequest := &polaris.InstanceRegisterRequest{}
registerRequest.Service = service
registerRequest.Namespace = namespace
registerRequest.Host = host
registerRequest.Port = svr.port
registerRequest.ServiceToken = token
registerRequest.Metadata = convertMetadatas()
resp, err := svr.provider.RegisterInstance(registerRequest)
if nil != err {
log.Fatalf("fail to register instance, err is %v", err)
}
log.Printf("register response: instanceId %s", resp.InstanceID)
}

func runMainLoop() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, []os.Signal{
Expand Down
4 changes: 2 additions & 2 deletions examples/route/nearby/consumer/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ go 1.17

require github.com/polarismesh/polaris-go v1.2.0-beta.3

replace github.com/polarismesh/polaris-go => ../../../../

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
Expand Down Expand Up @@ -32,5 +34,3 @@ require (
google.golang.org/protobuf v1.28.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

replace github.com/polarismesh/polaris-go => ../../../../
2 changes: 1 addition & 1 deletion pkg/model/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ type CircuitBreakGauge struct {
CBStatus CircuitBreakerStatus
}

// GetCircuitBreakerStatus 获取变化前的熔断状态
// GetCircuitBreakerStatus 获取当前实例熔断状态
func (cbg *CircuitBreakGauge) GetCircuitBreakerStatus() CircuitBreakerStatus {
return cbg.CBStatus
}
Expand Down
25 changes: 25 additions & 0 deletions pkg/model/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,31 @@ const (
RouteStat
)

func DescMetricType(t MetricType) string {
switch t {
case SDKAPIStat:
return "SDKAPIStat"
case ServiceStat:
return "ServiceStat"
case InstanceStat:
return "InstanceStat"
case SDKCfgStat:
return "SDKCfgStat"
case CircuitBreakStat:
return "CircuitBreakStat"
case PluginAPIStat:
return "PluginAPIStat"
case LoadBalanceStat:
return "LoadBalanceStat"
case RateLimitStat:
return "RateLimitStat"
case RouteStat:
return "RouteStat"
default:
return "Unknown"
}
}

var metricTypes = HashSet{}

// ValidMetircType 检测是不是合法的统计类型.
Expand Down
2 changes: 1 addition & 1 deletion plugin/localregistry/inmemory/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func (g *LocalCache) UpdateInstances(svcUpdateReq *localregistry.ServiceUpdateRe
cbStatusUpdated = false
}
err := g.engine.SyncReportStat(model.CircuitBreakStat,
&model.CircuitBreakGauge{ChangeInstance: updateInstance, CBStatus: preCBStatus})
&model.CircuitBreakGauge{ChangeInstance: updateInstance, CBStatus: nextCBStatus})
if err != nil {
log.GetBaseLogger().Errorf("fail to report circuitbreak change, error %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions plugin/statreporter/prometheus/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,14 @@ var (
CircuitBreakerOpen = metricDesc{
Name: MetricsNameCircuitBreakerOpen,
Help: "total of opened circuit breaker",
MetricType: TypeForCounterVec,
MetricType: TypeForGaugeVec,
LabelNames: GetLabels(CircuitBreakerGaugeLabelOrder),
}

CircuitBreakerHalfOpen = metricDesc{
Name: MetricsNameCircuitBreakerHalfOpen,
Help: "total of half-open circuit breaker",
MetricType: TypeForCounterVec,
MetricType: TypeForGaugeVec,
LabelNames: GetLabels(CircuitBreakerGaugeLabelOrder),
}
)
Expand Down
9 changes: 5 additions & 4 deletions plugin/statreporter/prometheus/prometheus_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,21 +223,22 @@ func (p *PrometheusHandler) handleRateLimitGauge(metricsType model.MetricType, v
func (p *PrometheusHandler) handleCircuitBreakGauge(metricsType model.MetricType, val *model.CircuitBreakGauge) {
labels := p.convertCircuitBreakGaugeToLabels(val)

open := p.metricVecCaches[MetricsNameCircuitBreakerOpen].(*prometheus.CounterVec)
open := p.metricVecCaches[MetricsNameCircuitBreakerOpen].(*prometheus.GaugeVec)

// 计算完之后的熔断状态
status := val.GetCircuitBreakerStatus().GetStatus()
if status == model.Open {
open.With(labels).Inc()
} else {
open.With(labels).Add(-1)
open.With(labels).Dec()
}

halfOpen := p.metricVecCaches[MetricsNameCircuitBreakerHalfOpen].(*prometheus.CounterVec)
halfOpen := p.metricVecCaches[MetricsNameCircuitBreakerHalfOpen].(*prometheus.GaugeVec)

if status == model.HalfOpen {
halfOpen.With(labels).Inc()
} else {
halfOpen.With(labels).Add(-1)
halfOpen.With(labels).Dec()
}
}

Expand Down