Skip to content

Commit

Permalink
add stats tagger APIs and connection stats. (grpc#992)
Browse files Browse the repository at this point in the history
* add stats.tagger APIs and connection stats.

* fix comments

use ac.ctx in http2client
change name and comments
small fixes stats_tests

* add a TODO to ConnTagInfo

* rename handle to handleRPC

* modify stats comments
  • Loading branch information
menghanl authored and iamqizhao committed Dec 1, 2016
1 parent cc3363f commit deb01f4
Show file tree
Hide file tree
Showing 8 changed files with 574 additions and 150 deletions.
9 changes: 5 additions & 4 deletions call.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran
if inPayload != nil && err == io.EOF && stream.StatusCode() == codes.OK {
// TODO in the current implementation, inTrailer may be handled before inPayload in some cases.
// Fix the order if necessary.
stats.Handle(ctx, inPayload)
stats.HandleRPC(ctx, inPayload)
}
c.trailerMD = stream.Trailer()
return nil
Expand Down Expand Up @@ -121,7 +121,7 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
err = t.Write(stream, outBuf, opts)
if err == nil && outPayload != nil {
outPayload.SentTime = time.Now()
stats.Handle(ctx, outPayload)
stats.HandleRPC(ctx, outPayload)
}
// t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method
// does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following
Expand Down Expand Up @@ -172,12 +172,13 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}()
}
if stats.On() {
ctx = stats.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
begin := &stats.Begin{
Client: true,
BeginTime: time.Now(),
FailFast: c.failFast,
}
stats.Handle(ctx, begin)
stats.HandleRPC(ctx, begin)
}
defer func() {
if stats.On() {
Expand All @@ -186,7 +187,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
EndTime: time.Now(),
Error: e,
}
stats.Handle(ctx, end)
stats.HandleRPC(ctx, end)
}
}()
topts := &transport.Options{
Expand Down
12 changes: 6 additions & 6 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
err = t.Write(stream, p, opts)
if err == nil && outPayload != nil {
outPayload.SentTime = time.Now()
stats.Handle(stream.Context(), outPayload)
stats.HandleRPC(stream.Context(), outPayload)
}
return err
}
Expand All @@ -593,7 +593,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
begin := &stats.Begin{
BeginTime: time.Now(),
}
stats.Handle(stream.Context(), begin)
stats.HandleRPC(stream.Context(), begin)
}
defer func() {
if stats.On() {
Expand All @@ -603,7 +603,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err != nil && err != io.EOF {
end.Error = toRPCErr(err)
}
stats.Handle(stream.Context(), end)
stats.HandleRPC(stream.Context(), end)
}
}()
if trInfo != nil {
Expand Down Expand Up @@ -698,7 +698,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
inPayload.Payload = v
inPayload.Data = req
inPayload.Length = len(req)
stats.Handle(stream.Context(), inPayload)
stats.HandleRPC(stream.Context(), inPayload)
}
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
Expand Down Expand Up @@ -759,7 +759,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
begin := &stats.Begin{
BeginTime: time.Now(),
}
stats.Handle(stream.Context(), begin)
stats.HandleRPC(stream.Context(), begin)
}
defer func() {
if stats.On() {
Expand All @@ -769,7 +769,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
if err != nil && err != io.EOF {
end.Error = toRPCErr(err)
}
stats.Handle(stream.Context(), end)
stats.HandleRPC(stream.Context(), end)
}
}()
if s.opts.cp != nil {
Expand Down
152 changes: 152 additions & 0 deletions stats/handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/

package stats

import (
"net"
"sync/atomic"

"golang.org/x/net/context"
"google.golang.org/grpc/grpclog"
)

// ConnTagInfo defines the relevant information needed by connection context tagger.
type ConnTagInfo struct {
// RemoteAddr is the remote address of the corresponding connection.
RemoteAddr net.Addr
// LocalAddr is the local address of the corresponding connection.
LocalAddr net.Addr
// TODO add QOS related fields.
}

// RPCTagInfo defines the relevant information needed by RPC context tagger.
type RPCTagInfo struct {
// FullMethodName is the RPC method in the format of /package.service/method.
FullMethodName string
}

var (
on = new(int32)
rpcHandler func(context.Context, RPCStats)
connHandler func(context.Context, ConnStats)
connTagger func(context.Context, *ConnTagInfo) context.Context
rpcTagger func(context.Context, *RPCTagInfo) context.Context
)

// HandleRPC processes the RPC stats using the rpc handler registered by the user.
func HandleRPC(ctx context.Context, s RPCStats) {
if rpcHandler == nil {
return
}
rpcHandler(ctx, s)
}

// RegisterRPCHandler registers the user handler function for RPC stats processing.
// It should be called only once. The later call will overwrite the former value if it is called multiple times.
// This handler function will be called to process the rpc stats.
func RegisterRPCHandler(f func(context.Context, RPCStats)) {
rpcHandler = f
}

// HandleConn processes the stats using the call back function registered by user.
func HandleConn(ctx context.Context, s ConnStats) {
if connHandler == nil {
return
}
connHandler(ctx, s)
}

// RegisterConnHandler registers the user handler function for conn stats.
// It should be called only once. The later call will overwrite the former value if it is called multiple times.
// This handler function will be called to process the conn stats.
func RegisterConnHandler(f func(context.Context, ConnStats)) {
connHandler = f
}

// TagConn calls user registered connection context tagger.
func TagConn(ctx context.Context, info *ConnTagInfo) context.Context {
if connTagger == nil {
return ctx
}
return connTagger(ctx, info)
}

// RegisterConnTagger registers the user connection context tagger function.
// The connection context tagger can attach some information to the given context.
// The returned context will be used for stats handling.
// For conn stats handling, the context used in connHandler for this
// connection will be derived from the context returned.
// For RPC stats handling,
// - On server side, the context used in rpcHandler for all RPCs on this
// connection will be derived from the context returned.
// - On client side, the context is not derived from the context returned.
func RegisterConnTagger(t func(context.Context, *ConnTagInfo) context.Context) {
connTagger = t
}

// TagRPC calls the user registered RPC context tagger.
func TagRPC(ctx context.Context, info *RPCTagInfo) context.Context {
if rpcTagger == nil {
return ctx
}
return rpcTagger(ctx, info)
}

// RegisterRPCTagger registers the user RPC context tagger function.
// The RPC context tagger can attach some information to the given context.
// The context used in stats rpcHandler for this RPC will be derived from the
// context returned.
func RegisterRPCTagger(t func(context.Context, *RPCTagInfo) context.Context) {
rpcTagger = t
}

// Start starts the stats collection and processing if there is a registered stats handle.
func Start() {
if rpcHandler == nil && connHandler == nil {
grpclog.Println("rpcHandler and connHandler are both nil when starting stats. Stats is not started")
return
}
atomic.StoreInt32(on, 1)
}

// Stop stops the stats collection and processing.
// Stop does not unregister the handlers.
func Stop() {
atomic.StoreInt32(on, 0)
}

// On indicates whether the stats collection and processing is on.
func On() bool {
return atomic.CompareAndSwapInt32(on, 1, 1)
}
70 changes: 37 additions & 33 deletions stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,12 @@ package stats // import "google.golang.org/grpc/stats"

import (
"net"
"sync/atomic"
"time"

"golang.org/x/net/context"
"google.golang.org/grpc/grpclog"
)

// RPCStats contains stats information about RPCs.
// All stats types in this package implements this interface.
type RPCStats interface {
isRPCStats()
// IsClient returns true if this RPCStats is from client side.
IsClient() bool
}
Expand All @@ -66,6 +62,8 @@ type Begin struct {
// IsClient indicates if this is from client side.
func (s *Begin) IsClient() bool { return s.Client }

func (s *Begin) isRPCStats() {}

// InPayload contains the information for an incoming payload.
type InPayload struct {
// Client is true if this InPayload is from client side.
Expand All @@ -85,6 +83,8 @@ type InPayload struct {
// IsClient indicates if this is from client side.
func (s *InPayload) IsClient() bool { return s.Client }

func (s *InPayload) isRPCStats() {}

// InHeader contains stats when a header is received.
// FullMethod, addresses and Compression are only valid if Client is false.
type InHeader struct {
Expand All @@ -106,6 +106,8 @@ type InHeader struct {
// IsClient indicates if this is from client side.
func (s *InHeader) IsClient() bool { return s.Client }

func (s *InHeader) isRPCStats() {}

// InTrailer contains stats when a trailer is received.
type InTrailer struct {
// Client is true if this InTrailer is from client side.
Expand All @@ -117,6 +119,8 @@ type InTrailer struct {
// IsClient indicates if this is from client side.
func (s *InTrailer) IsClient() bool { return s.Client }

func (s *InTrailer) isRPCStats() {}

// OutPayload contains the information for an outgoing payload.
type OutPayload struct {
// Client is true if this OutPayload is from client side.
Expand All @@ -136,6 +140,8 @@ type OutPayload struct {
// IsClient indicates if this is from client side.
func (s *OutPayload) IsClient() bool { return s.Client }

func (s *OutPayload) isRPCStats() {}

// OutHeader contains stats when a header is sent.
// FullMethod, addresses and Compression are only valid if Client is true.
type OutHeader struct {
Expand All @@ -157,6 +163,8 @@ type OutHeader struct {
// IsClient indicates if this is from client side.
func (s *OutHeader) IsClient() bool { return s.Client }

func (s *OutHeader) isRPCStats() {}

// OutTrailer contains stats when a trailer is sent.
type OutTrailer struct {
// Client is true if this OutTrailer is from client side.
Expand All @@ -168,6 +176,8 @@ type OutTrailer struct {
// IsClient indicates if this is from client side.
func (s *OutTrailer) IsClient() bool { return s.Client }

func (s *OutTrailer) isRPCStats() {}

// End contains stats when an RPC ends.
type End struct {
// Client is true if this End is from client side.
Expand All @@ -181,39 +191,33 @@ type End struct {
// IsClient indicates if this is from client side.
func (s *End) IsClient() bool { return s.Client }

var (
on = new(int32)
handler func(context.Context, RPCStats)
)
func (s *End) isRPCStats() {}

// On indicates whether stats is started.
func On() bool {
return atomic.CompareAndSwapInt32(on, 1, 1)
// ConnStats contains stats information about connections.
type ConnStats interface {
isConnStats()
// IsClient returns true if this ConnStats is from client side.
IsClient() bool
}

// Handle processes the stats using the call back function registered by user.
func Handle(ctx context.Context, s RPCStats) {
handler(ctx, s)
// ConnBegin contains the stats of a connection when it is established.
type ConnBegin struct {
// Client is true if this ConnBegin is from client side.
Client bool
}

// RegisterHandler registers the user handler function.
// If another handler was registered before, this new handler will overwrite the old one.
// This handler function will be called to process the stats.
func RegisterHandler(f func(context.Context, RPCStats)) {
handler = f
}
// IsClient indicates if this is from client side.
func (s *ConnBegin) IsClient() bool { return s.Client }

// Start starts the stats collection and reporting if there is a registered stats handle.
func Start() {
if handler == nil {
grpclog.Println("handler is nil when starting stats. Stats is not started")
return
}
atomic.StoreInt32(on, 1)
}
func (s *ConnBegin) isConnStats() {}

// Stop stops the stats collection and processing.
// Stop does not unregister handler.
func Stop() {
atomic.StoreInt32(on, 0)
// ConnEnd contains the stats of a connection when it ends.
type ConnEnd struct {
// Client is true if this ConnEnd is from client side.
Client bool
}

// IsClient indicates if this is from client side.
func (s *ConnEnd) IsClient() bool { return s.Client }

func (s *ConnEnd) isConnStats() {}
Loading

0 comments on commit deb01f4

Please sign in to comment.