Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -2004,7 +2004,7 @@ var (
}
syncInitStreamsFlag = cli.IntFlag{
Name: "sync.init-peers",
Usage: "Initial shard-wise number of peers to start syncing",
Usage: "Initial shard-wise number of peers to start syncing (lower values reduce startup time)",
Hidden: true,
}
syncMaxAdvertiseWaitTimeFlag = cli.IntFlag{
Expand All @@ -2014,12 +2014,12 @@ var (
}
syncDiscSoftLowFlag = cli.IntFlag{
Name: "sync.disc.soft-low-cap",
Usage: "Soft low cap for sync stream management",
Usage: "Soft low cap for sync stream management (triggers discovery every 30s when below this)",
Hidden: true,
}
syncDiscHardLowFlag = cli.IntFlag{
Name: "sync.disc.hard-low-cap",
Usage: "Hard low cap for sync stream management",
Usage: "Hard low cap for sync stream management (triggers immediate discovery when below this)",
Hidden: true,
}
syncDiscHighFlag = cli.IntFlag{
Expand All @@ -2029,7 +2029,7 @@ var (
}
syncDiscBatchFlag = cli.IntFlag{
Name: "sync.disc.batch",
Usage: "batch size of the sync discovery",
Usage: "batch size of the sync discovery (higher values can reduce startup time)",
Hidden: true,
}
)
Expand Down
4 changes: 4 additions & 0 deletions p2p/stream/common/ratelimiter/ratelimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func (sm *testStreamManager) SubscribeRemoveStreamEvent(ch chan<- streammanager.
return sm.removeFeed.Subscribe(ch)
}

func (sm *testStreamManager) SetEnoughStreamsCallback(callback func()) {
// No-op for test implementation
}

func (sm *testStreamManager) removeStream(stid sttypes.StreamID) {
sm.removeFeed.Send(streammanager.EvtStreamRemoved{ID: stid})
}
Expand Down
4 changes: 4 additions & 0 deletions p2p/stream/common/requestmanager/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (sm *testStreamManager) SubscribeRemoveStreamEvent(ch chan<- streammanager.
return sm.rmStreamFeed.Subscribe(ch)
}

func (sm *testStreamManager) SetEnoughStreamsCallback(callback func()) {
// No-op for test implementation
}

func (sm *testStreamManager) GetStreams() []sttypes.Stream {
sm.lock.Lock()
defer sm.lock.Unlock()
Expand Down
1 change: 1 addition & 0 deletions p2p/stream/common/streammanager/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type ReaderSubscriber interface {
type Operator interface {
NewStream(stream sttypes.Stream) error
RemoveStream(stID sttypes.StreamID, reason string, criticalErr bool) error
SetEnoughStreamsCallback(callback func()) // Add callback for when enough streams are found
}

// Subscriber is the interface to support stream event subscription
Expand Down
14 changes: 14 additions & 0 deletions p2p/stream/common/streammanager/streammanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type streamManager struct {

// limit concurrent setup of streams
setupSem chan struct{}

// callback for when enough streams are found
enoughStreamsCallback func()
}

type RemovalInfo struct {
Expand Down Expand Up @@ -136,6 +139,11 @@ func (rm *RemovalInfo) ResetCount() {
rm.count = 0
}

// SetEnoughStreamsCallback sets the callback function to be called when enough streams are found
func (sm *streamManager) SetEnoughStreamsCallback(callback func()) {
sm.enoughStreamsCallback = callback
}

// NewStreamManager creates a new stream manager for the given proto ID
func NewStreamManager(pid sttypes.ProtoID, host host, pf peerFinder, handleStream func(network.Stream), c Config) StreamManager {
return newStreamManager(pid, host, pf, handleStream, c)
Expand Down Expand Up @@ -379,6 +387,12 @@ func (sm *streamManager) handleAddStream(st sttypes.Stream) error {
sm.addStreamFeed.Send(EvtStreamAdded{st})
addedStreamsCounterVec.With(prometheus.Labels{"topic": string(sm.myProtoID)}).Inc()
numStreamsGaugeVec.With(prometheus.Labels{"topic": string(sm.myProtoID)}).Set(float64(sm.streams.size()))

// Call callback if enough streams are found
if sm.enoughStreamsCallback != nil && sm.softHaveEnoughStreams() {
sm.enoughStreamsCallback()
}

return nil
}

Expand Down
4 changes: 4 additions & 0 deletions p2p/stream/protocols/sync/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,10 @@ func (sm *testStreamManager) SubscribeRemoveStreamEvent(chan<- streammanager.Evt
return nil
}

func (sm *testStreamManager) SetEnoughStreamsCallback(callback func()) {
// No-op for test implementation
}

func (sm *testStreamManager) NewStream(stream sttypes.Stream) error {
stid := stream.ID()
for _, id := range sm.streamIDs {
Expand Down
58 changes: 58 additions & 0 deletions p2p/stream/protocols/sync/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,62 @@ const (
// rateLimiterSingleRequestsPerSecond is the request per second limit for a single stream in the sync protocol.
// This constant helps prevent the node resource from exhausting from a single remote node.
rateLimiterSingleRequestsPerSecond = 10

// Advertisement timing constants for startup mode optimization
// Normal mode timings
BaseTimeoutNormal = 300 * time.Second // 5 minutes base timeout
TimeoutIncrementStepNormal = 30 * time.Second // 30 seconds increment
MaxTimeoutNormal = 600 * time.Second // 10 minutes max timeout
BackoffTimeRatioNormal = 5 * time.Second // 5 seconds base backoff
MaxBackoffNormal = 30 * time.Second // 30 seconds max backoff

// Startup mode timings (faster for initial peer discovery)
BaseTimeoutStartup = 30 * time.Second // 30 seconds base timeout
TimeoutIncrementStepStartup = 10 * time.Second // 10 seconds increment
MaxTimeoutStartup = 120 * time.Second // 2 minutes max timeout
BackoffTimeRatioStartup = 2 * time.Second // 2 seconds base backoff
MaxBackoffStartup = 15 * time.Second // 15 seconds max backoff

// Advertisement loop timing constants
MinSleepTimeNormal = 30 * time.Second // Minimum sleep time in normal mode
MaxSleepTimeNormal = 60 * time.Minute // Maximum sleep time in normal mode
MinSleepTimeStartup = 10 * time.Second // Minimum sleep time in startup mode
MaxSleepTimeStartup = 2 * time.Minute // Maximum sleep time in startup mode

// Startup mode duration
StartupModeDuration = 10 * time.Minute // How long to stay in startup mode

// Adaptive timing constants
SleepIncreasePerPeer = 1 * time.Second // Increase sleep time per peer found
SleepDecreaseRatio = 0.7 // Decrease sleep time by 30% when no peers found

// DHT Request Limits - How many peers to request from DHT
// These should be higher than target limits because DHT may return invalid peers
// Based on stream sync configuration and realistic peer discovery ratios:
// - Mainnet: Request 20, expect ~8 valid (40% success rate)
// - Testnet: Request 8, expect ~2 valid (25% success rate)
// - Devnet: Request 12, expect ~4 valid (33% success rate)
DHTRequestLimitMainnet = 20 // Request 20, expect ~8 valid
DHTRequestLimitTestnet = 10 // Request 10, expect ~3 valid
DHTRequestLimitPangaea = 10 // Request 10, expect ~3 valid
DHTRequestLimitPartner = 10 // Request 10, expect ~3 valid
DHTRequestLimitStressnet = 12 // Request 12, expect ~4 valid
DHTRequestLimitDevnet = 12 // Request 12, expect ~4 valid
DHTRequestLimitLocalnet = 12 // Request 12, expect ~4 valid

// Target Valid Peer Counts - How many valid peers we want to find
// These are the actual peer counts we aim for after filtering
// Based on stream sync configuration requirements:
// - Mainnet: InitStreams=8, DiscSoftLowCap=8, DiscHardLowCap=6
// - Testnet: InitStreams=3, DiscSoftLowCap=3, DiscHardLowCap=3
// - Localnet: InitStreams=4, DiscSoftLowCap=4, DiscHardLowCap=4
// - Partner: InitStreams=3, DiscSoftLowCap=3, DiscHardLowCap=3
// - Else: InitStreams=4, DiscSoftLowCap=4, DiscHardLowCap=4
TargetValidPeersMainnet = 8 // Target 8 valid peers (matches InitStreams)
TargetValidPeersTestnet = 3 // Target 3 valid peers (matches InitStreams)
TargetValidPeersPangaea = 3 // Target 3 valid peers (testnet-like)
TargetValidPeersPartner = 3 // Target 3 valid peers (matches InitStreams)
TargetValidPeersStressnet = 4 // Target 4 valid peers (else config)
TargetValidPeersDevnet = 4 // Target 4 valid peers (else config)
TargetValidPeersLocalnet = 4 // Target 4 valid peers (matches InitStreams)
)
Loading