Skip to content

Commit

Permalink
Adds name to route, and writes it in every error message (#1777)
Browse files Browse the repository at this point in the history
* Adds name to route, and writes it in every error message

* Update all calls with route name

* Fixed a few missed points

Co-authored-by: stasatdaglabs <39559713+stasatdaglabs@users.noreply.github.com>
  • Loading branch information
svarogg and stasatdaglabs authored Jul 4, 2021
1 parent 61aa15f commit 069ee26
Show file tree
Hide file tree
Showing 11 changed files with 36 additions and 33 deletions.
4 changes: 2 additions & 2 deletions app/protocol/flows/testing/handle_relay_invs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1534,8 +1534,8 @@ func TestHandleRelayInvs(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
t.Parallel()

incomingRoute := router.NewRoute()
outgoingRoute := router.NewRoute()
incomingRoute := router.NewRoute("incoming")
outgoingRoute := router.NewRoute("outgoing")
peer := peerpkg.New(nil)
errChan := make(chan error)
context := &fakeRelayInvsContext{
Expand Down
4 changes: 2 additions & 2 deletions app/protocol/flows/testing/receiveaddresses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func (f fakeReceiveAddressesContext) AddressManager() *addressmanager.AddressMan

func TestReceiveAddressesErrors(t *testing.T) {
testutils.ForAllNets(t, true, func(t *testing.T, consensusConfig *consensus.Config) {
incomingRoute := router.NewRoute()
outgoingRoute := router.NewRoute()
incomingRoute := router.NewRoute("incoming")
outgoingRoute := router.NewRoute("outgoing")
peer := peerpkg.New(nil)
errChan := make(chan error)
go func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func TestHandleRelayedTransactionsNotFound(t *testing.T) {
domain: domainInstance,
sharedRequestedTransactions: sharedRequestedTransactions,
}
incomingRoute := router.NewRoute()
incomingRoute := router.NewRoute("incoming")
defer incomingRoute.Close()
peerIncomingRoute := router.NewRoute()
peerIncomingRoute := router.NewRoute("outgoing")
defer peerIncomingRoute.Close()

txID1 := externalapi.NewDomainTransactionIDFromByteArray(&[externalapi.DomainHashSize]byte{
Expand Down Expand Up @@ -167,8 +167,8 @@ func TestOnClosedIncomingRoute(t *testing.T) {
domain: domainInstance,
sharedRequestedTransactions: sharedRequestedTransactions,
}
incomingRoute := router.NewRoute()
outgoingRoute := router.NewRoute()
incomingRoute := router.NewRoute("incoming")
outgoingRoute := router.NewRoute("outgoing")
defer outgoingRoute.Close()

txID := externalapi.NewDomainTransactionIDFromByteArray(&[externalapi.DomainHashSize]byte{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func TestHandleRequestedTransactionsNotFound(t *testing.T) {
domain: domainInstance,
sharedRequestedTransactions: sharedRequestedTransactions,
}
incomingRoute := router.NewRoute()
outgoingRoute := router.NewRoute()
incomingRoute := router.NewRoute("incoming")
outgoingRoute := router.NewRoute("outgoing")
defer outgoingRoute.Close()

txID1 := externalapi.NewDomainTransactionIDFromByteArray(&[externalapi.DomainHashSize]byte{
Expand Down
10 changes: 5 additions & 5 deletions app/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (m *Manager) registerRejectsFlow(router *routerpkg.Router, isStopping *uint
func (m *Manager) registerFlow(name string, router *routerpkg.Router, messageTypes []appmessage.MessageCommand, isStopping *uint32,
errChan chan error, initializeFunc flowInitializeFunc) *flow {

route, err := router.AddIncomingRoute(messageTypes)
route, err := router.AddIncomingRoute(name, messageTypes)
if err != nil {
panic(err)
}
Expand All @@ -294,7 +294,7 @@ func (m *Manager) registerFlowWithCapacity(name string, capacity int, router *ro
messageTypes []appmessage.MessageCommand, isStopping *uint32,
errChan chan error, initializeFunc flowInitializeFunc) *flow {

route, err := router.AddIncomingRouteWithCapacity(capacity, messageTypes)
route, err := router.AddIncomingRouteWithCapacity(name, capacity, messageTypes)
if err != nil {
panic(err)
}
Expand All @@ -320,7 +320,7 @@ func (m *Manager) registerFlowForRoute(route *routerpkg.Route, name string, isSt
func (m *Manager) registerOneTimeFlow(name string, router *routerpkg.Router, messageTypes []appmessage.MessageCommand,
isStopping *uint32, stopChan chan error, initializeFunc flowInitializeFunc) *flow {

route, err := router.AddIncomingRoute(messageTypes)
route, err := router.AddIncomingRoute(name, messageTypes)
if err != nil {
panic(err)
}
Expand All @@ -346,12 +346,12 @@ func (m *Manager) registerOneTimeFlow(name string, router *routerpkg.Router, mes

func registerHandshakeRoutes(router *routerpkg.Router) (
receiveVersionRoute *routerpkg.Route, sendVersionRoute *routerpkg.Route) {
receiveVersionRoute, err := router.AddIncomingRoute([]appmessage.MessageCommand{appmessage.CmdVersion})
receiveVersionRoute, err := router.AddIncomingRoute("recieveVersion - incoming", []appmessage.MessageCommand{appmessage.CmdVersion})
if err != nil {
panic(err)
}

sendVersionRoute, err = router.AddIncomingRoute([]appmessage.MessageCommand{appmessage.CmdVerAck})
sendVersionRoute, err = router.AddIncomingRoute("sendVersion - incoming", []appmessage.MessageCommand{appmessage.CmdVerAck})
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion app/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (m *Manager) routerInitializer(router *router.Router, netConnection *netada
for messageType := range handlers {
messageTypes = append(messageTypes, messageType)
}
incomingRoute, err := router.AddIncomingRoute(messageTypes)
incomingRoute, err := router.AddIncomingRoute("rpc router", messageTypes)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion infrastructure/network/netadapter/netadapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
func routerInitializerForTest(t *testing.T, routes *sync.Map,
routeName string, wg *sync.WaitGroup) func(*router.Router, *NetConnection) {
return func(router *router.Router, connection *NetConnection) {
route, err := router.AddIncomingRoute([]appmessage.MessageCommand{appmessage.CmdPing})
route, err := router.AddIncomingRoute(routeName, []appmessage.MessageCommand{appmessage.CmdPing})
if err != nil {
t.Fatalf("TestNetAdapter: AddIncomingRoute failed: %+v", err)
}
Expand Down
14 changes: 8 additions & 6 deletions infrastructure/network/netadapter/router/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var (

// Route represents an incoming or outgoing Router route
type Route struct {
name string
channel chan appmessage.Message
// closed and closeLock are used to protect us from writing to a closed channel
// reads use the channel's built-in mechanism to check if the channel is closed
Expand All @@ -37,12 +38,13 @@ type Route struct {
}

// NewRoute create a new Route
func NewRoute() *Route {
return newRouteWithCapacity(DefaultMaxMessages)
func NewRoute(name string) *Route {
return newRouteWithCapacity(name, DefaultMaxMessages)
}

func newRouteWithCapacity(capacity int) *Route {
func newRouteWithCapacity(name string, capacity int) *Route {
return &Route{
name: name,
channel: make(chan appmessage.Message, capacity),
closed: false,
capacity: capacity,
Expand All @@ -58,7 +60,7 @@ func (r *Route) Enqueue(message appmessage.Message) error {
return errors.WithStack(ErrRouteClosed)
}
if len(r.channel) == r.capacity {
return errors.Wrapf(ErrRouteCapacityReached, "reached capacity of %d", r.capacity)
return errors.Wrapf(ErrRouteCapacityReached, "route '%s' reached capacity of %d", r.name, r.capacity)
}
r.channel <- message
return nil
Expand All @@ -68,7 +70,7 @@ func (r *Route) Enqueue(message appmessage.Message) error {
func (r *Route) Dequeue() (appmessage.Message, error) {
message, isOpen := <-r.channel
if !isOpen {
return nil, errors.WithStack(ErrRouteClosed)
return nil, errors.Wrapf(ErrRouteClosed, "route '%s' is closed", r.name)
}
return message, nil
}
Expand All @@ -78,7 +80,7 @@ func (r *Route) Dequeue() (appmessage.Message, error) {
func (r *Route) DequeueWithTimeout(timeout time.Duration) (appmessage.Message, error) {
select {
case <-time.After(timeout):
return nil, errors.Wrapf(ErrTimeout, "got timeout after %s", timeout)
return nil, errors.Wrapf(ErrTimeout, "route '%s' got timeout after %s", r.name, timeout)
case message, isOpen := <-r.channel:
if !isOpen {
return nil, errors.WithStack(ErrRouteClosed)
Expand Down
11 changes: 6 additions & 5 deletions infrastructure/network/netadapter/router/router.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package router

import (
"fmt"
"sync"

"github.com/kaspanet/kaspad/app/appmessage"
Expand All @@ -26,15 +27,15 @@ type Router struct {
func NewRouter() *Router {
router := Router{
incomingRoutes: make(map[appmessage.MessageCommand]*Route),
outgoingRoute: newRouteWithCapacity(outgoingRouteMaxMessages),
outgoingRoute: newRouteWithCapacity("", outgoingRouteMaxMessages),
}
return &router
}

// AddIncomingRoute registers the messages of types `messageTypes` to
// be routed to the given `route`
func (r *Router) AddIncomingRoute(messageTypes []appmessage.MessageCommand) (*Route, error) {
route := NewRoute()
func (r *Router) AddIncomingRoute(name string, messageTypes []appmessage.MessageCommand) (*Route, error) {
route := NewRoute(fmt.Sprintf("%s - incoming", name))
err := r.initializeIncomingRoute(route, messageTypes)
if err != nil {
return nil, err
Expand All @@ -44,8 +45,8 @@ func (r *Router) AddIncomingRoute(messageTypes []appmessage.MessageCommand) (*Ro

// AddIncomingRouteWithCapacity registers the messages of types `messageTypes` to
// be routed to the given `route` with a capacity of `capacity`
func (r *Router) AddIncomingRouteWithCapacity(capacity int, messageTypes []appmessage.MessageCommand) (*Route, error) {
route := newRouteWithCapacity(capacity)
func (r *Router) AddIncomingRouteWithCapacity(name string, capacity int, messageTypes []appmessage.MessageCommand) (*Route, error) {
route := newRouteWithCapacity(fmt.Sprintf("%s - incoming", name), capacity)
err := r.initializeIncomingRoute(route, messageTypes)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,19 +197,19 @@ outerLoop:
routesChan := make(chan *Routes)

routeInitializer := func(router *router.Router, netConnection *netadapter.NetConnection) {
handshakeRoute, err := router.AddIncomingRoute([]appmessage.MessageCommand{appmessage.CmdVersion, appmessage.CmdVerAck})
handshakeRoute, err := router.AddIncomingRoute("handshake", []appmessage.MessageCommand{appmessage.CmdVersion, appmessage.CmdVerAck})
if err != nil {
panic(errors.Wrap(err, "error registering handshake route"))
}
addressesRoute, err := router.AddIncomingRoute([]appmessage.MessageCommand{appmessage.CmdRequestAddresses, appmessage.CmdAddresses})
addressesRoute, err := router.AddIncomingRoute("addresses", []appmessage.MessageCommand{appmessage.CmdRequestAddresses, appmessage.CmdAddresses})
if err != nil {
panic(errors.Wrap(err, "error registering addresses route"))
}
pingRoute, err := router.AddIncomingRoute([]appmessage.MessageCommand{appmessage.CmdPing})
pingRoute, err := router.AddIncomingRoute("ping", []appmessage.MessageCommand{appmessage.CmdPing})
if err != nil {
panic(errors.Wrap(err, "error registering ping route"))
}
everythingElseRoute, err := router.AddIncomingRoute(everythingElse)
everythingElseRoute, err := router.AddIncomingRoute("everything else", everythingElse)
if err != nil {
panic(errors.Wrap(err, "error registering everythingElseRoute"))
}
Expand Down
2 changes: 1 addition & 1 deletion infrastructure/network/rpcclient/rpcrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func buildRPCRouter() (*rpcRouter, error) {
router := routerpkg.NewRouter()
routes := make(map[appmessage.MessageCommand]*routerpkg.Route, len(appmessage.RPCMessageCommandToString))
for messageType := range appmessage.RPCMessageCommandToString {
route, err := router.AddIncomingRoute([]appmessage.MessageCommand{messageType})
route, err := router.AddIncomingRoute("rpc client", []appmessage.MessageCommand{messageType})
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 069ee26

Please sign in to comment.