Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ASoC 2022] Optimization of Pixiu timeout feature #475

Merged
merged 16 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat:timeout optimization
  • Loading branch information
CSWYF3634076 committed Aug 14, 2022
commit 7a0f3f45821e373dc190620d492be2826a600518
2 changes: 1 addition & 1 deletion pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Client interface {
// Apply to init client
Apply() error

// Close close the clinet
// Close close the client
Close() error

// Call invoke the downstream service.
Expand Down
2 changes: 2 additions & 0 deletions pkg/client/dubbo/dubbo.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ func (dc *Client) Call(req *client.Request) (res interface{}, err error) {
span.SetAttributes(attribute.Key(spanTagValues).String(string(finalValues)))
defer span.End()
ctx := context.WithValue(req.Context, constant.TracingRemoteSpanCtx, trace.SpanFromContext(req.Context).SpanContext())
ctx, cancel := context.WithTimeout(ctx, req.Timeout)
defer cancel()
rst, err := gs.Invoke(ctx, method, types, vals)
if err != nil {
return nil, err
Expand Down
3 changes: 1 addition & 2 deletions pkg/client/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"net/url"
"strings"
"sync"
"time"
)

import (
Expand Down Expand Up @@ -107,7 +106,7 @@ func (dc *Client) Call(req *client.Request) (resp interface{}, err error) {

newReq, _ := http.NewRequest(req.IngressRequest.Method, targetURL, params.Body)
newReq.Header = params.Header
httpClient := &http.Client{Timeout: 5 * time.Second}
httpClient := &http.Client{Timeout: req.Timeout}

tr := otel.Tracer(traceNameHTTPClient)
_, span := tr.Start(req.Context, "HTTP "+newReq.Method, trace.WithSpanKind(trace.SpanKindClient))
Expand Down
9 changes: 5 additions & 4 deletions pkg/client/mq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ type (
}

KafkaProducerConfig struct {
Brokers []string `yaml:"brokers" json:"brokers"`
ProtocolVersion string `yaml:"protocol_version" json:"protocol_version"`
Metadata Metadata `yaml:"metadata" json:"metadata"`
Producer Producer `yaml:"producer" json:"producer"`
Brokers []string `yaml:"brokers" json:"brokers"`
ProtocolVersion string `yaml:"protocol_version" json:"protocol_version"`
Metadata Metadata `yaml:"metadata" json:"metadata"`
Producer Producer `yaml:"producer" json:"producer"`
Timeout time.Duration `yaml:"timeout" json:"timeout"`
}

Metadata struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/client/mq/kafka_facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ func NewKafkaProviderFacade(config KafkaProducerConfig) (*KafkaProducerFacade, e
c.Metadata.Retry.Max = config.Metadata.Retry.Max
c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
c.Producer.MaxMessageBytes = config.Producer.MaxMessageBytes
c.Producer.Timeout = config.Timeout
if config.ProtocolVersion != "" {
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/client/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func NewMQClient(config Config) (*Client, error) {
ctx := context.Background()
switch config.MqType {
case constant.MQTypeKafka:
config.KafkaProducerConfig.Timeout = config.Timeout
pf, err := NewKafkaProviderFacade(config.KafkaProducerConfig)
if err != nil {
return nil, err
Expand Down Expand Up @@ -125,7 +126,9 @@ func (c Client) Call(req *client.Request) (res interface{}, err error) {
consumerFacadeMap.Store(cReq.ConsumerGroup, facade)
if f, ok := consumerFacadeMap.Load(cReq.ConsumerGroup); ok {
cf := f.(ConsumerFacade)
err = cf.Subscribe(c.ctx, WithTopics(cReq.TopicList), WithConsumeUrl(cReq.ConsumeUrl), WithCheckUrl(cReq.CheckUrl), WithConsumerGroup(cReq.ConsumerGroup))
ctx, cancel := context.WithTimeout(c.ctx, req.Timeout)
defer cancel()
err = cf.Subscribe(ctx, WithTopics(cReq.TopicList), WithConsumeUrl(cReq.ConsumeUrl), WithCheckUrl(cReq.CheckUrl), WithConsumerGroup(cReq.ConsumerGroup))
if err != nil {
facade.Stop()
consumerFacadeMap.Delete(cReq.ConsumerGroup)
Expand Down
2 changes: 2 additions & 0 deletions pkg/client/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package client
import (
"context"
"net/http"
"time"
)

import (
Expand All @@ -32,6 +33,7 @@ type Request struct {
Context context.Context
IngressRequest *http.Request
API router.API
Timeout time.Duration
}

// NewReq create a request
Expand Down
8 changes: 7 additions & 1 deletion pkg/client/triple/triple.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/url"
"strings"
"sync"
"time"
)

import (
Expand All @@ -34,6 +35,7 @@ import (

import (
"github.com/apache/dubbo-go-pixiu/pkg/client"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
)

// InitDefaultTripleClient init default dubbo client
Expand Down Expand Up @@ -89,7 +91,11 @@ func (dc *Client) Call(req *client.Request) (res interface{}, err error) {
}
meta := make(map[string][]string)
reqData, _ := ioutil.ReadAll(req.IngressRequest.Body)
call, err := p.Call(context.Background(), req.API.Method.IntegrationRequest.Interface, req.API.Method.IntegrationRequest.Method, reqData, (*proxymeta.Metadata)(&meta))
logger.Debugf("[dubbo-go-pixiu] client timeout val %v", req.Timeout)
ctx, cancel := context.WithTimeout(context.Background(), req.Timeout)
defer cancel()
time.Sleep(100 * time.Nanosecond)
call, err := p.Call(ctx, req.API.Method.IntegrationRequest.Interface, req.API.Method.IntegrationRequest.Method, reqData, (*proxymeta.Metadata)(&meta))
if err != nil {
return "", errors.Errorf("call triple server error = %s", err)
}
Expand Down
20 changes: 18 additions & 2 deletions pkg/common/constant/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package constant

import "time"
import (
"fmt"
"time"
)

var (
Default403Body = []byte("403 for bidden")
Expand All @@ -43,5 +46,18 @@ const (
// console
Console = "console"

DefaultReqTimeout = 10 * time.Second
DefaultReqTimeout = 1 * time.Nanosecond
)

func ResolveTimeStr2Time(currentV string, defaultV time.Duration) time.Duration {
fmt.Printf("timeout parse %s : %d", currentV, defaultV)
if currentV == "" {
return defaultV
} else {
if duration, err := time.ParseDuration(currentV); err != nil {
return defaultV
} else {
return duration
}
}
}
4 changes: 4 additions & 0 deletions pkg/common/extension/filter/filter_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ func (c *defaultFilterChain) AppendEncodeFilters(f ...HttpEncodeFilter) {
}

func (c *defaultFilterChain) OnDecode(ctx *http.HttpContext) {

for ; c.decodeFiltersIndex < len(c.decodeFilters); c.decodeFiltersIndex++ {

//logger.Debugf("[dubbo-go-pixiu] client Before Api timout :%v", ctx.Timeout)
//logger.Debugf("[dubbo-go-pixiu] client filter :%v", c.decodeFilters[c.decodeFiltersIndex])
filterStatus := c.decodeFilters[c.decodeFiltersIndex].Decode(ctx)

switch filterStatus {
Expand Down
19 changes: 1 addition & 18 deletions pkg/common/grpc/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"io/ioutil"
"net"
stdHttp "net/http"
"time"
)

import (
Expand All @@ -38,7 +37,6 @@ import (
)

import (
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
router2 "github.com/apache/dubbo-go-pixiu/pkg/common/router"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
Expand All @@ -51,14 +49,12 @@ type GrpcConnectionManager struct {
filter.EmptyNetworkFilter
config *model.GRPCConnectionManagerConfig
routerCoordinator *router2.RouterCoordinator
timeout time.Duration
}

// CreateGrpcConnectionManager create grpc connection manager
func CreateGrpcConnectionManager(hcmc *model.GRPCConnectionManagerConfig) *GrpcConnectionManager {
hcm := &GrpcConnectionManager{config: hcmc}
hcm.routerCoordinator = router2.CreateRouterCoordinator(&hcmc.RouteConfig)
hcm.timeout = resolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout)
return hcm
}

Expand All @@ -82,10 +78,9 @@ func (gcm *GrpcConnectionManager) ServeHTTP(w stdHttp.ResponseWriter, r *stdHttp
gcm.writeStatus(w, status.New(codes.Unknown, "can't find endpoint in cluster"))
return
}

ctx := context.Background()
// timeout
ctx, cancel := context.WithTimeout(ctx, gcm.timeout)
ctx, cancel := context.WithTimeout(ctx, gcm.config.Timeout)
defer cancel()
newReq := r.Clone(ctx)
newReq.URL.Scheme = "http"
Expand Down Expand Up @@ -161,15 +156,3 @@ func copyHeader(dst, src stdHttp.Header) {
}
}
}

func resolveTimeStr2Time(currentV string, defaultV time.Duration) time.Duration {
if currentV == "" {
return defaultV
} else {
if duration, err := time.ParseDuration(currentV); err != nil {
return defaultV
} else {
return duration
}
}
}
17 changes: 1 addition & 16 deletions pkg/common/http/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"io/ioutil"
stdHttp "net/http"
"sync"
"time"
)

import (
Expand All @@ -48,7 +47,6 @@ type HttpConnectionManager struct {
routerCoordinator *router2.RouterCoordinator
filterManager *filter.FilterManager
pool sync.Pool
timeout time.Duration
}

// CreateHttpConnectionManager create http connection manager
Expand All @@ -60,7 +58,6 @@ func CreateHttpConnectionManager(hcmc *model.HttpConnectionManagerConfig) *HttpC
hcm.routerCoordinator = router2.CreateRouterCoordinator(&hcmc.RouteConfig)
hcm.filterManager = filter.NewFilterManager(hcmc.HTTPFilters)
hcm.filterManager.Load()
hcm.timeout = resolveTimeStr2Time(hcmc.TimeoutStr, constant.DefaultReqTimeout)
return hcm
}

Expand All @@ -87,7 +84,7 @@ func (hcm *HttpConnectionManager) ServeHTTP(w stdHttp.ResponseWriter, r *stdHttp
hc.Writer = w
hc.Request = r
hc.Reset()
hc.Timeout = hcm.timeout
hc.Timeout = hcm.config.Timeout
err := hcm.Handle(hc)
if err != nil {
logger.Errorf("ServeHTTP %v", err)
Expand Down Expand Up @@ -168,15 +165,3 @@ func (hcm *HttpConnectionManager) findRoute(hc *pch.HttpContext) error {
hc.RouteEntry(ra)
return nil
}

func resolveTimeStr2Time(currentV string, defaultV time.Duration) time.Duration {
if currentV == "" {
return defaultV
} else {
if duration, err := time.ParseDuration(currentV); err != nil {
return defaultV
} else {
return duration
}
}
}
4 changes: 3 additions & 1 deletion pkg/context/http/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,9 @@ func (hc *HttpContext) LocalReply() bool {

// API sets the API to http context
func (hc *HttpContext) API(api router.API) {
hc.Timeout = api.Timeout
if hc.Timeout > api.Timeout {
hc.Timeout = api.Timeout
}
hc.Api = &api
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/auth/jwt/jwt.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi
}

func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus {

//logger.Debugf("[dubbo-go-pixiu] client Before Api timout jwt :%v", ctx.Timeout)
path := ctx.Request.RequestURI

router := false
Expand Down
1 change: 1 addition & 0 deletions pkg/filter/authority/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi
}

func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus {
//logger.Debugf("[dubbo-go-pixiu] client Before Api timout authority :%v", c.Timeout)
for _, r := range f.cfg.Rules {
item := c.GetClientIP()
if r.Limit == App {
Expand Down
2 changes: 2 additions & 0 deletions pkg/filter/cors/cors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package cors

import (
"github.com/apache/dubbo-go-pixiu/pkg/logger"
stdHttp "net/http"
)

Expand Down Expand Up @@ -79,6 +80,7 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi
}

func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus {
logger.Debugf("[dubbo-go-pixiu] client Before Api timout cors :%v", ctx.Timeout)
f.handleCors(ctx)
return filter.Continue
}
Expand Down
1 change: 1 addition & 0 deletions pkg/filter/csrf/csrf.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi
}

func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus {
//logger.Debugf("[dubbo-go-pixiu] client Before Api timout call :%v", ctx.Timeout)
ctx.Request.Header.Set(csrfSecret, f.cfg.Secret)

if inMethod(f.cfg.IgnoreMethods, ctx.Request.Method) {
Expand Down
21 changes: 4 additions & 17 deletions pkg/filter/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package event
import (
"fmt"
sdkhttp "net/http"
"time"
)

import (
Expand Down Expand Up @@ -71,24 +70,12 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi
}

func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus {
//logger.Debugf("[dubbo-go-pixiu] client Before Api timout event :%v", ctx.Timeout)
f.cfg.Timeout = ctx.Timeout
mqClient := mq.NewSingletonMQClient(*f.cfg)
req := client.NewReq(ctx.Request.Context(), ctx.Request, *ctx.GetAPI())

var resp interface{}
var err error
respCh := make(chan struct{})
go func() {
resp, err = mqClient.Call(req)
close(respCh)
}()
select {
case <-respCh:
case <-time.After(ctx.Timeout):
logger.Errorf("[dubbo-go-pixiu] event client call timeout err!")
return filter.Stop
}

//resp, err := mqClient.Call(req)
req.Timeout = ctx.Timeout
resp, err := mqClient.Call(req)
if err != nil {
logger.Errorf("[dubbo-go-pixiu] event client call err:%v!", err)
ctx.SendLocalReply(sdkhttp.StatusInternalServerError, []byte(fmt.Sprintf("event client call err:%v", err)))
Expand Down
1 change: 1 addition & 0 deletions pkg/filter/header/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain fi
}

func (f *Filter) Decode(hc *http.HttpContext) filter.FilterStatus {
//logger.Debugf("[dubbo-go-pixiu] client Before Api timout call :%v", hc.Timeout)
api := hc.GetAPI()
headers := api.Headers
if len(headers) == 0 {
Expand Down
1 change: 1 addition & 0 deletions pkg/filter/http/apiconfig/api_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, c
}

func (f *Filter) Decode(ctx *contexthttp.HttpContext) filter.FilterStatus {
logger.Debugf("[dubbo-go-pixiu] client Before Api timout apiconfig :%v", ctx.Timeout)
req := ctx.Request
v, err := f.apiService.MatchAPI(req.URL.Path, fc.HTTPVerb(req.Method))
if err != nil {
Expand Down
Loading