-
Notifications
You must be signed in to change notification settings - Fork 3
/
event_resender.go
84 lines (77 loc) · 2.33 KB
/
event_resender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package core
import (
"context"
"fmt"
"github.com/ryogrid/nostrp2p/np2p_const"
"github.com/ryogrid/nostrp2p/schema"
"time"
)
// EventResender is a struct that manages the resending of events
// and the removal of store amount limit overed events
type EventResender struct {
dman DataManager
msgMan *MessageManager
cancel *context.CancelFunc
}
func NewEventResender(dman DataManager, msgMan *MessageManager) *EventResender {
return &EventResender{dman: dman, msgMan: msgMan}
}
func (er *EventResender) Start() {
ctx, cancel := context.WithCancel(context.Background())
er.cancel = &cancel
go er.ResendEvents(ctx, np2p_const.ResendCcheckInterval)
}
func (er *EventResender) Stop() {
(*er.cancel)()
}
func (er *EventResender) ResendEvents(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// resend events
fmt.Println("ResendEvents: start")
itr := er.dman.GetReSendNeededEventItr()
if itr == nil {
fmt.Println("on re-send iterating, skip this time because of no entries")
continue
}
unixtime := time.Now().Unix()
for itr.Next() {
val := itr.Value()
if val == nil {
fmt.Println("on re-send iterating, strange nil value found")
continue
}
resendEvt := val.(*schema.ResendEvent)
elapsedMin := (unixtime - resendEvt.CreatedAt) / 60
if evt, ok := er.dman.GetEventById(resendEvt.EvtId); ok {
for n := 1; n <= np2p_const.ResendMaxTimes; n++ {
diff := elapsedMin - int64(np2p_const.ResendTimeBaseMin*2^n)
// if elapsed min is match with resend time, resend
if diff == 0 {
for _, destId := range resendEvt.DestIds {
// resend
evtArr := []*schema.Np2pEvent{evt}
err := er.msgMan.SendMsgUnicast(destId, schema.NewNp2pPacket(&evtArr, nil))
if err == nil {
// remove from resend needed list
er.dman.RemoveReSendNeededEvent(resendEvt, evt)
}
}
break
}
}
}
// if elapsed min is over resend max time, remove from resend needed list
if elapsedMin > int64(np2p_const.ResendTimeBaseMin*(2^np2p_const.ResendMaxTimes)) {
er.dman.RemoveReSendNeededEvent(resendEvt, nil)
}
}
// remove store amount limit overed events from DB
er.dman.RemoveStoreAmountLimitOveredEvents()
}
}
}