diff --git a/core/comm/config.go b/core/comm/config.go index c8af5ebc195..33cef277b8a 100644 --- a/core/comm/config.go +++ b/core/comm/config.go @@ -23,19 +23,12 @@ var ( maxRecvMsgSize = 100 * 1024 * 1024 maxSendMsgSize = 100 * 1024 * 1024 // Default peer keepalive options - keepaliveOptions = KeepaliveOptions{ - ClientKeepaliveTime: 60, // 1 min - ClientKeepaliveTimeout: 20, // 20 sec - gRPC default - ServerKeepaliveTime: 7200, // 2 hours - gRPC default - ServerKeepaliveTimeout: 20, // 20 sec - gRPC default - } - // chaincode keepalive options separate from peer keepalive - // options above (for flexibility) - chaincodeKeepaliveOptions = KeepaliveOptions{ - ClientKeepaliveTime: 60, // 1 min - ClientKeepaliveTimeout: 20, // 20 sec - gRPC default - ServerKeepaliveTime: 60, // 1 min - ServerKeepaliveTimeout: 20, // 20 sec - gRPC default + keepaliveOptions = &KeepaliveOptions{ + ClientInterval: time.Duration(1) * time.Minute, // 1 min + ClientTimeout: time.Duration(20) * time.Second, // 20 sec - gRPC default + ServerInterval: time.Duration(2) * time.Hour, // 2 hours - gRPC default + ServerTimeout: time.Duration(20) * time.Second, // 20 sec - gRPC default + ServerMinInterval: time.Duration(1) * time.Minute, // match ClientInterval } ) @@ -69,20 +62,21 @@ type SecureOptions struct { // KeepAliveOptions is used to set the gRPC keepalive settings for both // clients and servers type KeepaliveOptions struct { - // ClientKeepaliveTime is the duration in seconds after which if the client - // does not see any activity from the server it pings the server to see - // if it is alive - ClientKeepaliveTime int - // ClientKeepaliveTimeout is the duration the client waits for a response + // ClientInterval is the duration after which if the client does not see + // any activity from the server it pings the server to see if it is alive + ClientInterval time.Duration + // ClientTimeout is the duration the client waits for a response // from the server after sending a ping before closing the connection - ClientKeepaliveTimeout int - // ServerKeepaliveTime is the duration in seconds after which if the server - // does not see any activity from the client it pings the client to see - // if it is alive - ServerKeepaliveTime int - // ServerKeepaliveTimeout is the duration the server waits for a response + ClientTimeout time.Duration + // ServerInterval is the duration after which if the server does not see + // any activity from the client it pings the client to see if it is alive + ServerInterval time.Duration + // ServerTimeout is the duration the server waits for a response // from the client after sending a ping before closing the connection - ServerKeepaliveTimeout int + ServerTimeout time.Duration + // ServerMinInterval is the minimum permitted time between client pings. + // If clients send pings more frequently, the server will disconnect them + ServerMinInterval time.Duration } // cacheConfiguration caches common package scoped variables @@ -125,27 +119,21 @@ func SetMaxSendMsgSize(size int) { maxSendMsgSize = size } -// SetKeepaliveOptions sets the gRPC keepalive options for both clients and -// servers -func SetKeepaliveOptions(ka KeepaliveOptions) { - keepaliveOptions = ka -} - -// ServerKeepaliveOptions returns the gRPC keepalive options for servers -func ServerKeepaliveOptions() []grpc.ServerOption { - return serverKeepaliveOptionsWithKa(&keepaliveOptions) -} - -func serverKeepaliveOptionsWithKa(ka *KeepaliveOptions) []grpc.ServerOption { +// ServerKeepaliveOptions returns gRPC keepalive options for server. If +// opts is nil, the default keepalive options are returned +func ServerKeepaliveOptions(ka *KeepaliveOptions) []grpc.ServerOption { + // use default keepalive options if nil + if ka == nil { + ka = keepaliveOptions + } var serverOpts []grpc.ServerOption kap := keepalive.ServerParameters{ - Time: time.Duration(ka.ServerKeepaliveTime) * time.Second, - Timeout: time.Duration(ka.ServerKeepaliveTimeout) * time.Second, + Time: ka.ServerInterval, + Timeout: ka.ServerTimeout, } serverOpts = append(serverOpts, grpc.KeepaliveParams(kap)) kep := keepalive.EnforcementPolicy{ - // needs to match clientKeepalive - MinTime: time.Duration(ka.ClientKeepaliveTime) * time.Second, + MinTime: ka.ServerMinInterval, // allow keepalive w/o rpc PermitWithoutStream: true, } @@ -153,16 +141,18 @@ func serverKeepaliveOptionsWithKa(ka *KeepaliveOptions) []grpc.ServerOption { return serverOpts } -// ClientKeepaliveOptions returns the gRPC keepalive options for clients -func ClientKeepaliveOptions() []grpc.DialOption { - return clientKeepaliveOptionsWithKa(&keepaliveOptions) -} +// ClientKeepaliveOptions returns gRPC keepalive options for clients. If +// opts is nil, the default keepalive options are returned +func ClientKeepaliveOptions(ka *KeepaliveOptions) []grpc.DialOption { + // use default keepalive options if nil + if ka == nil { + ka = keepaliveOptions + } -func clientKeepaliveOptionsWithKa(ka *KeepaliveOptions) []grpc.DialOption { var dialOpts []grpc.DialOption kap := keepalive.ClientParameters{ - Time: time.Duration(ka.ClientKeepaliveTime) * time.Second, - Timeout: time.Duration(ka.ClientKeepaliveTimeout) * time.Second, + Time: ka.ClientInterval, + Timeout: ka.ClientTimeout, PermitWithoutStream: true, } dialOpts = append(dialOpts, grpc.WithKeepaliveParams(kap)) diff --git a/core/comm/config_test.go b/core/comm/config_test.go index b3bc4807402..55dd545ca9d 100644 --- a/core/comm/config_test.go +++ b/core/comm/config_test.go @@ -27,22 +27,6 @@ func TestConfig(t *testing.T) { assert.EqualValues(t, size, MaxRecvMsgSize()) assert.EqualValues(t, size, MaxSendMsgSize()) - // set keepalive options - timeout := 1000 - ka := KeepaliveOptions{ - ClientKeepaliveTime: timeout, - ClientKeepaliveTimeout: timeout + 1, - ServerKeepaliveTime: timeout + 2, - ServerKeepaliveTimeout: timeout + 3, - } - SetKeepaliveOptions(ka) - assert.EqualValues(t, timeout, keepaliveOptions.ClientKeepaliveTime) - assert.EqualValues(t, timeout+1, keepaliveOptions.ClientKeepaliveTimeout) - assert.EqualValues(t, timeout+2, keepaliveOptions.ServerKeepaliveTime) - assert.EqualValues(t, timeout+3, keepaliveOptions.ServerKeepaliveTimeout) - assert.EqualValues(t, 2, len(ServerKeepaliveOptions())) - assert.Equal(t, 1, len(ClientKeepaliveOptions())) - // reset cache configurationCached = false viper.Set("peer.tls.enabled", true) diff --git a/core/comm/connection.go b/core/comm/connection.go index 4740c18b4a6..d534b69dce1 100644 --- a/core/comm/connection.go +++ b/core/comm/connection.go @@ -181,18 +181,15 @@ func GetPeerTestingAddress(port string) string { } // NewClientConnectionWithAddress Returns a new grpc.ClientConn to the given address -func NewClientConnectionWithAddress(peerAddress string, block bool, tslEnabled bool, creds credentials.TransportCredentials) (*grpc.ClientConn, error) { - return newClientConnectionWithAddressWithKa(peerAddress, block, tslEnabled, creds, nil) +func NewClientConnectionWithAddress(peerAddress string, block bool, tslEnabled bool, + creds credentials.TransportCredentials, ka *KeepaliveOptions) (*grpc.ClientConn, error) { + return newClientConnectionWithAddressWithKa(peerAddress, block, tslEnabled, creds, ka) } // NewChaincodeClientConnectionWithAddress Returns a new chaincode type grpc.ClientConn to the given address func NewChaincodeClientConnectionWithAddress(peerAddress string, block bool, tslEnabled bool, creds credentials.TransportCredentials) (*grpc.ClientConn, error) { - ka := chaincodeKeepaliveOptions - //client side's keepalive parameter better be greater than EnforcementPolicies MinTime - //to prevent server killing the connection due to timing issues. Just increase by a min - ka.ClientKeepaliveTime += 60 - - return newClientConnectionWithAddressWithKa(peerAddress, block, tslEnabled, creds, &ka) + ka := &KeepaliveOptions{} + return newClientConnectionWithAddressWithKa(peerAddress, block, tslEnabled, creds, ka) } // newClientConnectionWithAddressWithKa Returns a new grpc.ClientConn to the given address using specied keepalive options @@ -203,7 +200,7 @@ func newClientConnectionWithAddressWithKa(peerAddress string, block bool, tslEna //want to change this in future to have peer client //send keepalives too if ka != nil { - opts = clientKeepaliveOptionsWithKa(ka) + opts = ClientKeepaliveOptions(ka) } if tslEnabled { diff --git a/core/comm/connection_test.go b/core/comm/connection_test.go index 8e3a892f7ea..059d2269ded 100644 --- a/core/comm/connection_test.go +++ b/core/comm/connection_test.go @@ -53,9 +53,11 @@ func TestConnection_Correct(t *testing.T) { var tmpConn *grpc.ClientConn var err error if TLSEnabled() { - tmpConn, err = NewClientConnectionWithAddress(peerAddress, true, true, InitTLSForPeer()) + tmpConn, err = NewClientConnectionWithAddress(peerAddress, true, true, + InitTLSForPeer(), nil) } - tmpConn, err = NewClientConnectionWithAddress(peerAddress, true, false, nil) + tmpConn, err = NewClientConnectionWithAddress(peerAddress, true, false, + nil, nil) if err != nil { t.Fatalf("error connection to server at host:port = %s\n", peerAddress) } @@ -88,9 +90,11 @@ func TestConnection_WrongAddress(t *testing.T) { var tmpConn *grpc.ClientConn var err error if TLSEnabled() { - tmpConn, err = NewClientConnectionWithAddress(peerAddress, true, true, InitTLSForPeer()) + tmpConn, err = NewClientConnectionWithAddress(peerAddress, true, true, + InitTLSForPeer(), nil) } - tmpConn, err = NewClientConnectionWithAddress(peerAddress, true, false, nil) + tmpConn, err = NewClientConnectionWithAddress(peerAddress, true, false, + nil, nil) if err == nil { fmt.Printf("error connection to server - at host:port = %s\n", peerAddress) t.Error("error connection to server - connection should fail") diff --git a/core/comm/server.go b/core/comm/server.go index 42cbdc632d6..6b2c9c6016c 100644 --- a/core/comm/server.go +++ b/core/comm/server.go @@ -17,86 +17,67 @@ import ( "google.golang.org/grpc" ) -//GRPCServer defines an interface representing a GRPC-based server +// GRPCServer defines an interface representing a GRPC-based server type GRPCServer interface { - //Address returns the listen address for the GRPCServer + // Address returns the listen address for the GRPCServer Address() string - //Start starts the underlying grpc.Server + // Start starts the underlying grpc.Server Start() error - //Stop stops the underlying grpc.Server + // Stop stops the underlying grpc.Server Stop() - //Server returns the grpc.Server instance for the GRPCServer + // Server returns the grpc.Server instance for the GRPCServer Server() *grpc.Server - //Listener returns the net.Listener instance for the GRPCServer + // Listener returns the net.Listener instance for the GRPCServer Listener() net.Listener - //ServerCertificate returns the tls.Certificate used by the grpc.Server + // ServerCertificate returns the tls.Certificate used by the grpc.Server ServerCertificate() tls.Certificate - //TLSEnabled is a flag indicating whether or not TLS is enabled for this - //GRPCServer instance + // TLSEnabled is a flag indicating whether or not TLS is enabled for this + // GRPCServer instance TLSEnabled() bool - //MutualTLSRequired is a flag indicating whether or not client certificates - //are required for this GRPCServer instance + // MutualTLSRequired is a flag indicating whether or not client certificates + // are required for this GRPCServer instance MutualTLSRequired() bool - //AppendClientRootCAs appends PEM-encoded X509 certificate authorities to - //the list of authorities used to verify client certificates + // AppendClientRootCAs appends PEM-encoded X509 certificate authorities to + // the list of authorities used to verify client certificates AppendClientRootCAs(clientRoots [][]byte) error - //RemoveClientRootCAs removes PEM-encoded X509 certificate authorities from - //the list of authorities used to verify client certificates + // RemoveClientRootCAs removes PEM-encoded X509 certificate authorities from + // the list of authorities used to verify client certificates RemoveClientRootCAs(clientRoots [][]byte) error - //SetClientRootCAs sets the list of authorities used to verify client - //certificates based on a list of PEM-encoded X509 certificate authorities + // SetClientRootCAs sets the list of authorities used to verify client + // certificates based on a list of PEM-encoded X509 certificate authorities SetClientRootCAs(clientRoots [][]byte) error } type grpcServerImpl struct { - //Listen address for the server specified as hostname:port + // Listen address for the server specified as hostname:port address string - //Listener for handling network requests + // Listener for handling network requests listener net.Listener - //GRPC server + // GRPC server server *grpc.Server - //Certificate presented by the server for TLS communication + // Certificate presented by the server for TLS communication serverCertificate tls.Certificate - //Key used by the server for TLS communication + // Key used by the server for TLS communication serverKeyPEM []byte - //List of certificate authorities to optionally pass to the client during - //the TLS handshake + // List of certificate authorities to optionally pass to the client during + // the TLS handshake serverRootCAs []tls.Certificate - //lock to protect concurrent access to append / remove + // lock to protect concurrent access to append / remove lock *sync.Mutex - //Set of PEM-encoded X509 certificate authorities used to populate - //the tlsConfig.ClientCAs indexed by subject + // Set of PEM-encoded X509 certificate authorities used to populate + // the tlsConfig.ClientCAs indexed by subject clientRootCAs map[string]*x509.Certificate - //TLS configuration used by the grpc server + // TLS configuration used by the grpc server tlsConfig *tls.Config - //Is TLS enabled? + // Is TLS enabled? tlsEnabled bool - //Are client certifictes required + // Are client certifictes required mutualTLSRequired bool } -//NewGRPCServer creates a new implementation of a GRPCServer given a -//listen address +// NewGRPCServer creates a new implementation of a GRPCServer given a +// listen address func NewGRPCServer(address string, serverConfig ServerConfig) (GRPCServer, error) { - return newGRPCServerWithKa(address, serverConfig, &keepaliveOptions) -} - -//NewChaincodeGRPCServer creates a new implementation of a chaincode GRPCServer given a -//listen address -func NewChaincodeGRPCServer(address string, serverConfig ServerConfig) (GRPCServer, error) { - return newGRPCServerWithKa(address, serverConfig, &chaincodeKeepaliveOptions) -} - -//NewGRPCServerFromListener creates a new implementation of a GRPCServer given -//an existing net.Listener instance using default keepalive -func NewGRPCServerFromListener(listener net.Listener, serverConfig ServerConfig) (GRPCServer, error) { - return newGRPCServerFromListenerWithKa(listener, serverConfig, &keepaliveOptions) -} - -//newGRPCServerWithKa creates a new implementation of a GRPCServer given a -//listen address with specified keepalive options -func newGRPCServerWithKa(address string, serverConfig ServerConfig, ka *KeepaliveOptions) (GRPCServer, error) { - if address == "" { return nil, errors.New("Missing address parameter") } @@ -106,15 +87,12 @@ func newGRPCServerWithKa(address string, serverConfig ServerConfig, ka *Keepaliv if err != nil { return nil, err } - - return newGRPCServerFromListenerWithKa(lis, serverConfig, ka) - + return NewGRPCServerFromListener(lis, serverConfig) } -//newGRPCServerFromListenerWithKa creates a new implementation of a GRPCServer given -//an existing net.Listener instance with specfied keepalive -func newGRPCServerFromListenerWithKa(listener net.Listener, serverConfig ServerConfig, - ka *KeepaliveOptions) (GRPCServer, error) { +// NewGRPCServerFromListener creates a new implementation of a GRPCServer given +// an existing net.Listener instance using default keepalive +func NewGRPCServerFromListener(listener net.Listener, serverConfig ServerConfig) (GRPCServer, error) { grpcServer := &grpcServerImpl{ address: listener.Addr().String(), listener: listener, @@ -175,57 +153,57 @@ func newGRPCServerFromListenerWithKa(listener net.Listener, serverConfig ServerC serverOpts = append(serverOpts, grpc.MaxSendMsgSize(MaxSendMsgSize())) serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(MaxRecvMsgSize())) // set the keepalive options - serverOpts = append(serverOpts, serverKeepaliveOptionsWithKa(ka)...) + serverOpts = append(serverOpts, ServerKeepaliveOptions(serverConfig.KaOpts)...) grpcServer.server = grpc.NewServer(serverOpts...) return grpcServer, nil } -//Address returns the listen address for this GRPCServer instance +// Address returns the listen address for this GRPCServer instance func (gServer *grpcServerImpl) Address() string { return gServer.address } -//Listener returns the net.Listener for the GRPCServer instance +// Listener returns the net.Listener for the GRPCServer instance func (gServer *grpcServerImpl) Listener() net.Listener { return gServer.listener } -//Server returns the grpc.Server for the GRPCServer instance +// Server returns the grpc.Server for the GRPCServer instance func (gServer *grpcServerImpl) Server() *grpc.Server { return gServer.server } -//ServerCertificate returns the tls.Certificate used by the grpc.Server +// ServerCertificate returns the tls.Certificate used by the grpc.Server func (gServer *grpcServerImpl) ServerCertificate() tls.Certificate { return gServer.serverCertificate } -//TLSEnabled is a flag indicating whether or not TLS is enabled for the -//GRPCServer instance +// TLSEnabled is a flag indicating whether or not TLS is enabled for the +// GRPCServer instance func (gServer *grpcServerImpl) TLSEnabled() bool { return gServer.tlsEnabled } -//MutualTLSRequired is a flag indicating whether or not client certificates -//are required for this GRPCServer instance +// MutualTLSRequired is a flag indicating whether or not client certificates +// are required for this GRPCServer instance func (gServer *grpcServerImpl) MutualTLSRequired() bool { return gServer.mutualTLSRequired } -//Start starts the underlying grpc.Server +// Start starts the underlying grpc.Server func (gServer *grpcServerImpl) Start() error { return gServer.server.Serve(gServer.listener) } -//Stop stops the underlying grpc.Server +// Stop stops the underlying grpc.Server func (gServer *grpcServerImpl) Stop() { gServer.server.Stop() } -//AppendClientRootCAs appends PEM-encoded X509 certificate authorities to -//the list of authorities used to verify client certificates +// AppendClientRootCAs appends PEM-encoded X509 certificate authorities to +// the list of authorities used to verify client certificates func (gServer *grpcServerImpl) AppendClientRootCAs(clientRoots [][]byte) error { gServer.lock.Lock() defer gServer.lock.Unlock() @@ -238,7 +216,7 @@ func (gServer *grpcServerImpl) AppendClientRootCAs(clientRoots [][]byte) error { return nil } -//internal function to add a PEM-encoded clientRootCA +// internal function to add a PEM-encoded clientRootCA func (gServer *grpcServerImpl) appendClientRootCA(clientRoot []byte) error { errMsg := "Failed to append client root certificate(s): %s" @@ -261,8 +239,8 @@ func (gServer *grpcServerImpl) appendClientRootCA(clientRoot []byte) error { return nil } -//RemoveClientRootCAs removes PEM-encoded X509 certificate authorities from -//the list of authorities used to verify client certificates +// RemoveClientRootCAs removes PEM-encoded X509 certificate authorities from +// the list of authorities used to verify client certificates func (gServer *grpcServerImpl) RemoveClientRootCAs(clientRoots [][]byte) error { gServer.lock.Lock() defer gServer.lock.Unlock() @@ -285,7 +263,7 @@ func (gServer *grpcServerImpl) RemoveClientRootCAs(clientRoots [][]byte) error { return nil } -//internal function to remove a PEM-encoded clientRootCA +// internal function to remove a PEM-encoded clientRootCA func (gServer *grpcServerImpl) removeClientRootCA(clientRoot []byte) error { errMsg := "Failed to remove client root certificate(s): %s" @@ -309,8 +287,8 @@ func (gServer *grpcServerImpl) removeClientRootCA(clientRoot []byte) error { return nil } -//SetClientRootCAs sets the list of authorities used to verify client -//certificates based on a list of PEM-encoded X509 certificate authorities +// SetClientRootCAs sets the list of authorities used to verify client +// certificates based on a list of PEM-encoded X509 certificate authorities func (gServer *grpcServerImpl) SetClientRootCAs(clientRoots [][]byte) error { gServer.lock.Lock() defer gServer.lock.Unlock() diff --git a/core/comm/server_test.go b/core/comm/server_test.go index f3a7250adb8..a1858c98d36 100644 --- a/core/comm/server_test.go +++ b/core/comm/server_test.go @@ -1373,13 +1373,12 @@ func TestSetClientRootCAs(t *testing.T) { func TestKeepaliveNoClientResponse(t *testing.T) { t.Parallel() // set up GRPCServer instance - kap := comm.KeepaliveOptions{ - ServerKeepaliveTime: 2, - ServerKeepaliveTimeout: 1, + kap := &comm.KeepaliveOptions{ + ServerInterval: time.Duration(2) * time.Second, + ServerTimeout: time.Duration(1) * time.Second, } - comm.SetKeepaliveOptions(kap) testAddress := "localhost:9400" - srv, err := comm.NewGRPCServer(testAddress, comm.ServerConfig{}) + srv, err := comm.NewGRPCServer(testAddress, comm.ServerConfig{KaOpts: kap}) assert.NoError(t, err, "Unexpected error starting GRPCServer") go srv.Start() defer srv.Stop() @@ -1405,13 +1404,12 @@ func TestKeepaliveNoClientResponse(t *testing.T) { func TestKeepaliveClientResponse(t *testing.T) { t.Parallel() // set up GRPCServer instance - kap := comm.KeepaliveOptions{ - ServerKeepaliveTime: 2, - ServerKeepaliveTimeout: 1, + kap := &comm.KeepaliveOptions{ + ServerInterval: time.Duration(2) * time.Second, + ServerTimeout: time.Duration(1) * time.Second, } - comm.SetKeepaliveOptions(kap) testAddress := "localhost:9401" - srv, err := comm.NewGRPCServer(testAddress, comm.ServerConfig{}) + srv, err := comm.NewGRPCServer(testAddress, comm.ServerConfig{KaOpts: kap}) assert.NoError(t, err, "Unexpected error starting GRPCServer") go srv.Start() defer srv.Stop() diff --git a/core/deliverservice/deliveryclient.go b/core/deliverservice/deliveryclient.go index 84ff3dd7fc7..0d3da9740cb 100644 --- a/core/deliverservice/deliveryclient.go +++ b/core/deliverservice/deliveryclient.go @@ -226,7 +226,7 @@ func DefaultConnectionFactory(channelID string) func(endpoint string) (*grpc.Cli dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize()), grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize()))) // set the keepalive options - dialOpts = append(dialOpts, comm.ClientKeepaliveOptions()...) + dialOpts = append(dialOpts, comm.ClientKeepaliveOptions(nil)...) if comm.TLSEnabled() { creds, err := comm.GetCredentialSupport().GetDeliverServiceCredentials(channelID) diff --git a/core/peer/peer.go b/core/peer/peer.go index 5cf04716c8f..24e01714364 100644 --- a/core/peer/peer.go +++ b/core/peer/peer.go @@ -629,9 +629,11 @@ func GetLocalIP() string { // NewPeerClientConnectionWithAddress Returns a new grpc.ClientConn to the configured local PEER. func NewPeerClientConnectionWithAddress(peerAddress string) (*grpc.ClientConn, error) { if comm.TLSEnabled() { - return comm.NewClientConnectionWithAddress(peerAddress, true, true, comm.InitTLSForPeer()) + return comm.NewClientConnectionWithAddress(peerAddress, true, true, + comm.InitTLSForPeer(), nil) } - return comm.NewClientConnectionWithAddress(peerAddress, true, false, nil) + return comm.NewClientConnectionWithAddress(peerAddress, true, false, + nil, nil) } // GetChannelsInfo returns an array with information about all channels for diff --git a/events/consumer/consumer.go b/events/consumer/consumer.go index 0a871cf7dea..aeb6fd543df 100644 --- a/events/consumer/consumer.go +++ b/events/consumer/consumer.go @@ -69,9 +69,11 @@ func NewEventsClient(peerAddress string, regTimeout time.Duration, adapter Event //newEventsClientConnectionWithAddress Returns a new grpc.ClientConn to the configured local PEER. func newEventsClientConnectionWithAddress(peerAddress string) (*grpc.ClientConn, error) { if comm.TLSEnabled() { - return comm.NewClientConnectionWithAddress(peerAddress, true, true, comm.InitTLSForPeer()) + return comm.NewClientConnectionWithAddress(peerAddress, true, true, + comm.InitTLSForPeer(), nil) } - return comm.NewClientConnectionWithAddress(peerAddress, true, false, nil) + return comm.NewClientConnectionWithAddress(peerAddress, true, false, + nil, nil) } func (ec *EventsClient) send(emsg *ehpb.Event) error { diff --git a/events/producer/events_test.go b/events/producer/events_test.go index a60c9b0f21f..58d893069ca 100644 --- a/events/producer/events_test.go +++ b/events/producer/events_test.go @@ -39,7 +39,8 @@ type client struct { } func newClient() *client { - conn, err := comm.NewClientConnectionWithAddress(peerAddress, true, false, nil) + conn, err := comm.NewClientConnectionWithAddress(peerAddress, true, false, + nil, nil) if err != nil { panic(err) } diff --git a/peer/node/start.go b/peer/node/start.go index fee3670ddf4..227521f9617 100644 --- a/peer/node/start.go +++ b/peer/node/start.go @@ -207,7 +207,7 @@ func serve(args []string) error { dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize()), grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize()))) // set the keepalive options - dialOpts = append(dialOpts, comm.ClientKeepaliveOptions()...) + dialOpts = append(dialOpts, comm.ClientKeepaliveOptions(nil)...) if comm.TLSEnabled() { comm.GetCredentialSupport().ClientCert = peerServer.ServerCertificate() @@ -319,7 +319,15 @@ func createChaincodeServer(caCert []byte, peerHostname string) (comm.GRPCServer, config.SecOpts.ClientRootCAs = append(config.SecOpts.ClientRootCAs, caCert) } - srv, err = comm.NewChaincodeGRPCServer(cclistenAddress, config) + // Chaincode keepalive options - static for now + chaincodeKeepaliveOptions := &comm.KeepaliveOptions{ + ServerInterval: time.Duration(2) * time.Hour, // 2 hours - gRPC default + ServerTimeout: time.Duration(20) * time.Second, // 20 sec - gRPC default + ServerMinInterval: time.Duration(1) * time.Minute, // match ClientInterval + } + config.KaOpts = chaincodeKeepaliveOptions + + srv, err = comm.NewGRPCServer(cclistenAddress, config) if err != nil { panic(err) }