@@ -263,22 +263,19 @@ func NewPushGossiper[T Gossipable](
263
263
mempool Set [T ],
264
264
client * p2p.Client ,
265
265
metrics Metrics ,
266
- numValidators int ,
267
- numNonValidators int ,
268
- numPeers int ,
266
+ gossipParams BranchingFactor ,
267
+ regossipParams BranchingFactor ,
269
268
discardedSize int ,
270
269
targetGossipSize int ,
271
270
maxRegossipFrequency time.Duration ,
272
271
) (* PushGossiper [T ], error ) {
272
+ if err := gossipParams .Verify (); err != nil {
273
+ return nil , fmt .Errorf ("invalid gossip params: %w" , err )
274
+ }
275
+ if err := regossipParams .Verify (); err != nil {
276
+ return nil , fmt .Errorf ("invalid regossip params: %w" , err )
277
+ }
273
278
switch {
274
- case numValidators < 0 :
275
- return nil , ErrInvalidNumValidators
276
- case numNonValidators < 0 :
277
- return nil , ErrInvalidNumNonValidators
278
- case numPeers < 0 :
279
- return nil , ErrInvalidNumPeers
280
- case max (numValidators , numNonValidators , numPeers ) == 0 :
281
- return nil , ErrInvalidNumToGossip
282
279
case discardedSize < 0 :
283
280
return nil , ErrInvalidDiscardedSize
284
281
case targetGossipSize < 0 :
@@ -292,9 +289,8 @@ func NewPushGossiper[T Gossipable](
292
289
set : mempool ,
293
290
client : client ,
294
291
metrics : metrics ,
295
- numValidators : numValidators ,
296
- numNonValidators : numNonValidators ,
297
- numPeers : numPeers ,
292
+ gossipParams : gossipParams ,
293
+ regossipParams : regossipParams ,
298
294
targetGossipSize : targetGossipSize ,
299
295
maxRegossipFrequency : maxRegossipFrequency ,
300
296
@@ -312,9 +308,8 @@ type PushGossiper[T Gossipable] struct {
312
308
client * p2p.Client
313
309
metrics Metrics
314
310
315
- numValidators int
316
- numNonValidators int
317
- numPeers int
311
+ gossipParams BranchingFactor
312
+ regossipParams BranchingFactor
318
313
targetGossipSize int
319
314
maxRegossipFrequency time.Duration
320
315
@@ -326,6 +321,27 @@ type PushGossiper[T Gossipable] struct {
326
321
discarded * cache.LRU [ids.ID , struct {}] // discarded attempts to avoid overgossiping transactions that are frequently dropped
327
322
}
328
323
324
+ type BranchingFactor struct {
325
+ Validators int
326
+ NonValidators int
327
+ Peers int
328
+ }
329
+
330
+ func (b * BranchingFactor ) Verify () error {
331
+ switch {
332
+ case b .Validators < 0 :
333
+ return ErrInvalidNumValidators
334
+ case b .NonValidators < 0 :
335
+ return ErrInvalidNumNonValidators
336
+ case b .Peers < 0 :
337
+ return ErrInvalidNumPeers
338
+ case max (b .Validators , b .NonValidators , b .Peers ) == 0 :
339
+ return ErrInvalidNumToGossip
340
+ default :
341
+ return nil
342
+ }
343
+ }
344
+
329
345
type tracking struct {
330
346
addedTime float64 // unix nanoseconds
331
347
lastGossiped time.Time
@@ -348,46 +364,46 @@ func (p *PushGossiper[T]) Gossip(ctx context.Context) error {
348
364
return nil
349
365
}
350
366
351
- var (
352
- sentBytes = 0
353
- gossip = make ([][]byte , 0 , defaultGossipableCount )
354
- )
355
-
356
- // Iterate over all unsent gossipables.
357
- for sentBytes < p .targetGossipSize {
358
- gossipable , ok := p .toGossip .PopLeft ()
359
- if ! ok {
360
- break
361
- }
362
-
363
- // Ensure item is still in the set before we gossip.
364
- gossipID := gossipable .GossipID ()
365
- tracking := p .tracking [gossipID ]
366
- if ! p .set .Has (gossipID ) {
367
- delete (p .tracking , gossipID )
368
- p .addedTimeSum -= tracking .addedTime
369
- continue
370
- }
371
-
372
- bytes , err := p .marshaller .MarshalGossip (gossipable )
373
- if err != nil {
374
- delete (p .tracking , gossipID )
375
- p .addedTimeSum -= tracking .addedTime
376
- return err
377
- }
367
+ if err := p .gossip (
368
+ ctx ,
369
+ now ,
370
+ p .gossipParams ,
371
+ p .toGossip ,
372
+ p .toRegossip ,
373
+ & cache.Empty [ids.ID , struct {}]{}, // Don't mark dropped unsent transactions as discarded
374
+ ); err != nil {
375
+ return fmt .Errorf ("unexpected error during gossip: %w" , err )
376
+ }
378
377
379
- gossip = append (gossip , bytes )
380
- sentBytes += len (bytes )
381
- p .toRegossip .PushRight (gossipable )
382
- tracking .lastGossiped = now
378
+ if err := p .gossip (
379
+ ctx ,
380
+ now ,
381
+ p .regossipParams ,
382
+ p .toRegossip ,
383
+ p .toRegossip ,
384
+ p .discarded , // Mark dropped sent transactions as discarded
385
+ ); err != nil {
386
+ return fmt .Errorf ("unexpected error during regossip: %w" , err )
383
387
}
388
+ return nil
389
+ }
384
390
385
- maxLastGossipTimeToRegossip := now .Add (- p .maxRegossipFrequency )
391
+ func (p * PushGossiper [T ]) gossip (
392
+ ctx context.Context ,
393
+ now time.Time ,
394
+ gossipParams BranchingFactor ,
395
+ toGossip buffer.Deque [T ],
396
+ toRegossip buffer.Deque [T ],
397
+ discarded cache.Cacher [ids.ID , struct {}],
398
+ ) error {
399
+ var (
400
+ sentBytes = 0
401
+ gossip = make ([][]byte , 0 , defaultGossipableCount )
402
+ maxLastGossipTimeToRegossip = now .Add (- p .maxRegossipFrequency )
403
+ )
386
404
387
- // Iterate over all previously sent gossipables to fill any remaining space
388
- // in the gossip batch.
389
405
for sentBytes < p .targetGossipSize {
390
- gossipable , ok := p . toRegossip .PopLeft ()
406
+ gossipable , ok := toGossip .PopLeft ()
391
407
if ! ok {
392
408
break
393
409
}
@@ -398,29 +414,28 @@ func (p *PushGossiper[T]) Gossip(ctx context.Context) error {
398
414
if ! p .set .Has (gossipID ) {
399
415
delete (p .tracking , gossipID )
400
416
p .addedTimeSum -= tracking .addedTime
401
- p . discarded .Put (gossipID , struct {}{}) // only add to discarded if previously sent
417
+ discarded .Put (gossipID , struct {}{}) // Cache that the item was dropped
402
418
continue
403
419
}
404
420
405
421
// Ensure we don't attempt to send a gossipable too frequently.
406
422
if maxLastGossipTimeToRegossip .Before (tracking .lastGossiped ) {
407
423
// Put the gossipable on the front of the queue to keep items sorted
408
424
// by last issuance time.
409
- p . toRegossip .PushLeft (gossipable )
425
+ toGossip .PushLeft (gossipable )
410
426
break
411
427
}
412
428
413
429
bytes , err := p .marshaller .MarshalGossip (gossipable )
414
430
if err != nil {
415
- // Should never happen because we've already sent this once.
416
431
delete (p .tracking , gossipID )
417
432
p .addedTimeSum -= tracking .addedTime
418
433
return err
419
434
}
420
435
421
436
gossip = append (gossip , bytes )
422
437
sentBytes += len (bytes )
423
- p . toRegossip .PushRight (gossipable )
438
+ toRegossip .PushRight (gossipable )
424
439
tracking .lastGossiped = now
425
440
}
426
441
@@ -448,9 +463,9 @@ func (p *PushGossiper[T]) Gossip(ctx context.Context) error {
448
463
return p .client .AppGossip (
449
464
ctx ,
450
465
msgBytes ,
451
- p . numValidators ,
452
- p . numNonValidators ,
453
- p . numPeers ,
466
+ gossipParams . Validators ,
467
+ gossipParams . NonValidators ,
468
+ gossipParams . Peers ,
454
469
)
455
470
}
456
471
0 commit comments