Skip to content

Swarm accounting #18050

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Nov 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion swarm/network/stream/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest

delivery := NewDelivery(to, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
streamer := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), registryOptions)
streamer := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), registryOptions, nil)
teardown := func() {
streamer.Close()
removeDataDir()
Expand Down
8 changes: 4 additions & 4 deletions swarm/network/stream/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func TestRequestFromPeers(t *testing.T) {
Peer: protocolsPeer,
}, to)
to.On(peer)
r := NewRegistry(addr.ID(), delivery, nil, nil, nil)
r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil)

// an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished
sp := &Peer{
Expand Down Expand Up @@ -332,7 +332,7 @@ func TestRequestFromPeersWithLightNode(t *testing.T) {
Peer: protocolsPeer,
}, to)
to.On(peer)
r := NewRegistry(addr.ID(), delivery, nil, nil, nil)
r := NewRegistry(addr.ID(), delivery, nil, nil, nil, nil)
// an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished
sp := &Peer{
Peer: protocolsPeer,
Expand Down Expand Up @@ -481,7 +481,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
SkipCheck: skipCheck,
Syncing: SyncingDisabled,
Retrieval: RetrievalEnabled,
})
}, nil)
bucket.Store(bucketKeyRegistry, r)

fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
Expand Down Expand Up @@ -656,7 +656,7 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip
Syncing: SyncingDisabled,
Retrieval: RetrievalDisabled,
SyncUpdateDelay: 0,
})
}, nil)

fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
Expand Down
2 changes: 1 addition & 1 deletion swarm/network/stream/intervals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
Retrieval: RetrievalDisabled,
Syncing: SyncingRegisterOnly,
SkipCheck: skipCheck,
})
}, nil)
bucket.Store(bucketKeyRegistry, r)

r.RegisterClientFunc(externalStreamName, func(p *Peer, t string, live bool) (Client, error) {
Expand Down
2 changes: 1 addition & 1 deletion swarm/network/stream/snapshot_retrieval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s no
Retrieval: RetrievalEnabled,
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
})
}, nil)

fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
Expand Down
4 changes: 2 additions & 2 deletions swarm/network/stream/snapshot_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Servic
Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
})
}, nil)

bucket.Store(bucketKeyRegistry, r)

Expand Down Expand Up @@ -362,7 +362,7 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int)
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
Retrieval: RetrievalDisabled,
Syncing: SyncingRegisterOnly,
})
}, nil)
bucket.Store(bucketKeyRegistry, r)

fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
Expand Down
76 changes: 51 additions & 25 deletions swarm/network/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"math"
"reflect"
"sync"
"time"

Expand Down Expand Up @@ -87,6 +88,9 @@ type Registry struct {
intervalsStore state.Store
autoRetrieval bool //automatically subscribe to retrieve request stream
maxPeerServers int
spec *protocols.Spec //this protocol's spec
balance protocols.Balance //implements protocols.Balance, for accounting
prices protocols.Prices //implements protocols.Prices, provides prices to accounting
}

// RegistryOptions holds optional values for NewRegistry constructor.
Expand All @@ -99,7 +103,7 @@ type RegistryOptions struct {
}

// NewRegistry is Streamer constructor
func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry {
func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry {
if options == nil {
options = &RegistryOptions{}
}
Expand All @@ -119,7 +123,10 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
intervalsStore: intervalsStore,
autoRetrieval: retrieval,
maxPeerServers: options.MaxPeerServers,
balance: balance,
}
streamer.setupSpec()

streamer.api = NewAPI(streamer)
delivery.getPeer = streamer.getPeer

Expand Down Expand Up @@ -228,6 +235,17 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
return streamer
}

//we need to construct a spec instance per node instance
func (r *Registry) setupSpec() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we please have this as setupProtocolSpec? and the corresponding createSpec as createProtocolSpec? these function names are over-generalised

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason for that? I have had many "obvious" wording in previous PRs required to be edited away, The variable was called Spec before and in p2p/protocols/protocol.go it is still just called Spec. So I don't see a over-generalization here?

//first create the "bare" spec
r.createSpec()
//if balance is nil, this node has been started without swap support (swapEnabled flag is false)
if r.balance != nil && !reflect.ValueOf(r.balance).IsNil() {
//swap is enabled, so setup the hook
r.spec.Hook = protocols.NewAccounting(r.balance, r.prices)
}
}

// RegisterClient registers an incoming streamer constructor
func (r *Registry) RegisterClientFunc(stream string, f func(*Peer, string, bool) (Client, error)) {
r.clientMu.Lock()
Expand Down Expand Up @@ -492,7 +510,7 @@ func (r *Registry) updateSyncing() {
}

func (r *Registry) runProtocol(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := protocols.NewPeer(p, rw, Spec)
peer := protocols.NewPeer(p, rw, r.spec)
bp := network.NewBzzPeer(peer)
np := network.NewPeer(bp, r.delivery.kad)
r.delivery.kad.On(np)
Expand Down Expand Up @@ -716,35 +734,43 @@ func (c *clientParams) clientCreated() {
close(c.clientCreatedC)
}

// Spec is the spec of the streamer protocol
var Spec = &protocols.Spec{
Name: "stream",
Version: 8,
MaxMsgSize: 10 * 1024 * 1024,
Messages: []interface{}{
UnsubscribeMsg{},
OfferedHashesMsg{},
WantedHashesMsg{},
TakeoverProofMsg{},
SubscribeMsg{},
RetrieveRequestMsg{},
ChunkDeliveryMsgRetrieval{},
SubscribeErrorMsg{},
RequestSubscriptionMsg{},
QuitMsg{},
ChunkDeliveryMsgSyncing{},
},
//GetSpec returns the streamer spec to callers
//This used to be a global variable but for simulations with
//multiple nodes its fields (notably the Hook) would be overwritten
func (r *Registry) GetSpec() *protocols.Spec {
return r.spec
}

func (r *Registry) createSpec() {
// Spec is the spec of the streamer protocol
var spec = &protocols.Spec{
Name: "stream",
Version: 8,
MaxMsgSize: 10 * 1024 * 1024,
Messages: []interface{}{
UnsubscribeMsg{},
OfferedHashesMsg{},
WantedHashesMsg{},
TakeoverProofMsg{},
SubscribeMsg{},
RetrieveRequestMsg{},
ChunkDeliveryMsgRetrieval{},
SubscribeErrorMsg{},
RequestSubscriptionMsg{},
QuitMsg{},
ChunkDeliveryMsgSyncing{},
},
}
r.spec = spec
}

func (r *Registry) Protocols() []p2p.Protocol {
return []p2p.Protocol{
{
Name: Spec.Name,
Version: Spec.Version,
Length: Spec.Length(),
Name: r.spec.Name,
Version: r.spec.Version,
Length: r.spec.Length(),
Run: r.runProtocol,
// NodeInfo: ,
// PeerInfo: ,
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion swarm/network/stream/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
Retrieval: RetrievalDisabled,
Syncing: SyncingAutoSubscribe,
SkipCheck: skipCheck,
})
}, nil)

fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
bucket.Store(bucketKeyFileStore, fileStore)
Expand Down
93 changes: 93 additions & 0 deletions swarm/swap/swap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package swap

import (
"errors"
"fmt"
"strconv"
"sync"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/state"
)

// SwAP Swarm Accounting Protocol
// a peer to peer micropayment system
// A node maintains an individual balance with every peer
// Only messages which have a price will be accounted for
type Swap struct {
stateStore state.Store //stateStore is needed in order to keep balances across sessions
lock sync.RWMutex //lock the balances
balances map[enode.ID]int64 //map of balances for each peer
}

// New - swap constructor
func New(stateStore state.Store) (swap *Swap) {
swap = &Swap{
stateStore: stateStore,
balances: make(map[enode.ID]int64),
}
return
}

//Swap implements the protocols.Balance interface
//Add is the (sole) accounting function
func (s *Swap) Add(amount int64, peer *protocols.Peer) (err error) {
s.lock.Lock()
defer s.lock.Unlock()

//load existing balances from the state store
err = s.loadState(peer)
if err != nil && err != state.ErrNotFound {
return
}
//adjust the balance
//if amount is negative, it will decrease, otherwise increase
s.balances[peer.ID()] += amount
//save the new balance to the state store
peerBalance := s.balances[peer.ID()]
err = s.stateStore.Put(peer.ID().String(), &peerBalance)

log.Debug(fmt.Sprintf("balance for peer %s: %s", peer.ID().String(), strconv.FormatInt(peerBalance, 10)))
return err
}

//GetPeerBalance returns the balance for a given peer
func (swap *Swap) GetPeerBalance(peer enode.ID) (int64, error) {
swap.lock.RLock()
defer swap.lock.RUnlock()
if p, ok := swap.balances[peer]; ok {
return p, nil
}
return 0, errors.New("Peer not found")
}

//load balances from the state store (persisted)
func (s *Swap) loadState(peer *protocols.Peer) (err error) {
var peerBalance int64
peerID := peer.ID()
//only load if the current instance doesn't already have this peer's
//balance in memory
if _, ok := s.balances[peerID]; !ok {
err = s.stateStore.Get(peerID.String(), &peerBalance)
s.balances[peerID] = peerBalance
}
return
}
Loading