Skip to content

Commit

Permalink
implemented follow list data management condes and some REST I/F. and…
Browse files Browse the repository at this point in the history
… fixed bug which occurs when same tag name on tags property
  • Loading branch information
ryogrid committed Mar 31, 2024
1 parent 23cb401 commit 53ac8ef
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 49 deletions.
123 changes: 83 additions & 40 deletions api_server/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ func NewNp2pEventForREST(evt *schema.Np2pEvent) *Np2pEventForREST {
tagsArr := make([][]string, 0)
for k, v := range evt.Tags {
tmpArr := make([]string, 0)
tmpArr = append(tmpArr, k)
r := []rune(k)
// remove duplicated tag suffix (ex: "p_0" -> "p")
tmpArr = append(tmpArr, string(r[0]))
for _, val := range v {
tmpArr = append(tmpArr, val.(string))
}
Expand All @@ -106,12 +108,21 @@ func NewNp2pEventForREST(evt *schema.Np2pEvent) *Np2pEventForREST {

func NewNp2pEventFromREST(evt *Np2pEventForREST) *schema.Np2pEvent {
tagsMap := make(map[string][]interface{})
tagCntMap := make(map[string]int)
for _, tag := range evt.Tags {
vals := make([]interface{}, 0)
for _, val := range tag[1:] {
vals = append(vals, val)
}
tagsMap[tag[0]] = vals
if _, ok := tagCntMap[tag[0]]; ok {
tagCntMap[tag[0]]++
tag[0] = fmt.Sprintf("%s_%d", tag[0], tagCntMap[tag[0]])
tagsMap[tag[0]] = vals
} else {
tagCntMap[tag[0]] = 0
tagsMap[tag[0]+"_0"] = vals
}

}

pkey, err := hex.DecodeString(evt.Pubkey)
Expand Down Expand Up @@ -187,6 +198,8 @@ func (s *ApiServer) publishHandler(w rest.ResponseWriter, req *rest.Request) {
s.sendPost(w, &input)
case core.KIND_EVT_PROFILE:
s.updateProfile(w, &input)
case core.KIND_EVT_FOLLOW_LIST:
s.setOrUpdateFollowList(w, &input)
case core.KIND_EVT_REACTION:
s.sendReaction(w, &input)
default:
Expand All @@ -195,6 +208,56 @@ func (s *ApiServer) publishHandler(w rest.ResponseWriter, req *rest.Request) {
}
}

func (s *ApiServer) sendPost(w rest.ResponseWriter, input *Np2pEventForREST) {
if input.Content == "" {
rest.Error(w, "Content is required", 400)
return
}

evt := NewNp2pEventFromREST(input)
s.buzzPeer.MessageMan.BcastOwnPost(evt)
// store for myself
s.buzzPeer.MessageMan.DataMan.StoreEvent(evt)

w.WriteJson(&EventsResp{})
}

func (s *ApiServer) updateProfile(w rest.ResponseWriter, input *Np2pEventForREST) {
if input.Tags == nil {
rest.Error(w, "Tags is null", http.StatusBadRequest)
return
}

evt := NewNp2pEventFromREST(input)
if *glo_val.SelfPubkey == evt.Pubkey {
s.buzzPeer.MessageMan.BcastProfile(evt)
// update local profile
glo_val.CurrentProfileEvt = evt
}

w.WriteJson(&GeneralResp{
"SUCCESS",
})
}

func (s *ApiServer) setOrUpdateFollowList(w rest.ResponseWriter, input *Np2pEventForREST) {
if input.Tags == nil {
rest.Error(w, "Tags is null", http.StatusBadRequest)
return
}

evt := NewNp2pEventFromREST(input)
if *glo_val.SelfPubkey == evt.Pubkey {
s.buzzPeer.MessageMan.DataMan.StoreEvent(evt)
// update local profile
glo_val.CurrentFollowListEvt = evt
}

w.WriteJson(&GeneralResp{
"SUCCESS",
})
}

func (s *ApiServer) sendReaction(w rest.ResponseWriter, input *Np2pEventForREST) {
evt := NewNp2pEventFromREST(input)
err := s.buzzPeer.MessageMan.UnicastEventData(evt.Tags["p"][0].(string), evt)
Expand All @@ -210,20 +273,6 @@ func (s *ApiServer) sendReaction(w rest.ResponseWriter, input *Np2pEventForREST)
w.WriteJson(&EventsResp{})
}

func (s *ApiServer) sendPost(w rest.ResponseWriter, input *Np2pEventForREST) {
if input.Content == "" {
rest.Error(w, "Content is required", 400)
return
}

evt := NewNp2pEventFromREST(input)
s.buzzPeer.MessageMan.BcastOwnPost(evt)
// store for myself
s.buzzPeer.MessageMan.DataMan.StoreEvent(evt)

w.WriteJson(&EventsResp{})
}

func (s *ApiServer) reqHandler(w rest.ResponseWriter, req *rest.Request) {
input := Np2pReqForREST{}
err := req.DecodeJsonPayload(&input)
Expand Down Expand Up @@ -256,7 +305,10 @@ func (s *ApiServer) reqHandler(w rest.ResponseWriter, req *rest.Request) {
s.getEvents(w, &input)
} else if slices.Contains(input.Kinds, core.KIND_REQ_PROFILE) {
s.getProfile(w, &input)
} else if slices.Contains(input.Kinds, core.KIND_REQ_FOLLOW_LIST) {
s.getFollowList(w, &input)
} else {

w.WriteJson(&EventsResp{
Events: []Np2pEventForREST{},
})
Expand All @@ -274,16 +326,25 @@ func (s *ApiServer) getProfile(w rest.ResponseWriter, input *Np2pReqForREST) {
// profile data will be included on response of "getEvents"
w.WriteJson(&EventsResp{Events: []Np2pEventForREST{}})
// request profile data for future
s.buzzPeer.MessageMan.UnicastProfileReq(shortPkey)
s.buzzPeer.MessageMan.UnicastProfileReq(shortPkey & 0x0000ffffffffffff)
}
}

func (s *ApiServer) getEvents(w rest.ResponseWriter, input *Np2pReqForREST) {
//if input.Since == 0 || input.Until == 0 {
// rest.Error(w, "value of since and untile is invalid", http.StatusBadRequest)
// return
//}
func (s *ApiServer) getFollowList(w rest.ResponseWriter, input *Np2pReqForREST) {
shortPkey := np2p_util.GetUint64FromHexPubKeyStr(input.Authors[0])
fListEvt := s.buzzPeer.MessageMan.DataMan.GetFollowListLocal(shortPkey)

if fListEvt != nil {
w.WriteJson(&EventsResp{Events: []Np2pEventForREST{*NewNp2pEventForREST(fListEvt)}})
} else {
// follow list data will be included on response of "getEvents"
w.WriteJson(&EventsResp{Events: []Np2pEventForREST{}})
// request profile data for future
s.buzzPeer.MessageMan.UnicastFollowListReq(shortPkey & 0x0000ffffffffffff)
}
}

func (s *ApiServer) getEvents(w rest.ResponseWriter, input *Np2pReqForREST) {
// for supporting Nostr clients
isPeriodSpecified := true
if input.Since == 0 {
Expand Down Expand Up @@ -333,24 +394,6 @@ func (s *ApiServer) gatherData(w rest.ResponseWriter, req *rest.Request) {
})
}

func (s *ApiServer) updateProfile(w rest.ResponseWriter, input *Np2pEventForREST) {
if input.Tags == nil {
rest.Error(w, "Tags is null", http.StatusBadRequest)
return
}

evt := NewNp2pEventFromREST(input)
if *glo_val.SelfPubkey == evt.Pubkey {
s.buzzPeer.MessageMan.BcastProfile(evt)
// update local profile
glo_val.CurrentProfileEvt = evt
}

w.WriteJson(&GeneralResp{
"SUCCESS",
})
}

func (s *ApiServer) LaunchAPIServer(addrStr string) {
api := rest.NewApi()

Expand Down
26 changes: 19 additions & 7 deletions core/data_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@ type DataManager struct {
EvtListTimeKeyMtx *sync.Mutex
EvtMapIdKey sync.Map // [32]byte -> *schema.Np2pEvent
// latest profile only stored
ProfMap sync.Map // pubkey lower 64bit (uint64) -> *schema.Np2pEvent
EvtLogger *EventDataLogger
ProfEvtMap sync.Map // pubkey lower 64bit (uint64) -> *schema.Np2pEvent
FollowListEvtMap sync.Map // pubkey lower 64bit (uint64) -> *schema.Np2pEvent
EvtLogger *EventDataLogger
}

func NewDataManager() *DataManager {
return &DataManager{
EvtListTimeKey: sortedlist.NewTree(),
EvtListTimeKeyMtx: &sync.Mutex{},
EvtMapIdKey: sync.Map{},
ProfMap: sync.Map{},
ProfEvtMap: sync.Map{}, // pubkey lower 64bit (uint64) -> *schema.Np2pEvent
FollowListEvtMap: sync.Map{}, // pubkey lower 64bit (uint64) -> *schema.Np2pEvent
EvtLogger: NewEventDataLogger("./" + strconv.FormatUint(glo_val.SelfPubkey64bit, 16) + ".evtlog"),
}
}
Expand All @@ -38,7 +40,7 @@ func (dman *DataManager) StoreEvent(evt *schema.Np2pEvent) {
dman.EvtListTimeKey.Add(evt.Created_at, evt)
dman.EvtListTimeKeyMtx.Unlock()
if _, ok := dman.EvtMapIdKey.Load(evt.Id); ok {
dman.EvtMapIdKey.Store(evt.Id, evt)
// do nothing when it is duplicated
} else {
dman.EvtMapIdKey.Store(evt.Id, evt)
// log event data when it is not duplicated
Expand All @@ -58,11 +60,11 @@ func (dman *DataManager) StoreEvent(evt *schema.Np2pEvent) {
}

func (dman *DataManager) StoreProfile(evt *schema.Np2pEvent) {
dman.ProfMap.Store(np2p_util.GetLower64bitUint(evt.Pubkey), evt)
dman.ProfEvtMap.Store(np2p_util.GetLower64bitUint(evt.Pubkey), evt)
}

func (dman *DataManager) GetProfileLocal(pubkey64bit uint64) *schema.Np2pEvent {
if val, ok := dman.ProfMap.Load(pubkey64bit); ok {
if val, ok := dman.ProfEvtMap.Load(pubkey64bit); ok {
return val.(*schema.Np2pEvent)
}
return nil
Expand All @@ -72,11 +74,21 @@ func (dman *DataManager) GetLatestEvents(since int64, until int64) *[]*schema.Np
dman.EvtListTimeKeyMtx.Lock()
defer dman.EvtListTimeKeyMtx.Unlock()
itr := dman.EvtListTimeKey.Range(since, until)

ret := make([]*schema.Np2pEvent, 0)
for itr.Next() {
val := itr.Value()
ret = append(ret, val.(*schema.Np2pEvent))
}
return &ret
}

func (dman *DataManager) StoreFollowList(evt *schema.Np2pEvent) {
dman.FollowListEvtMap.Store(np2p_util.GetLower64bitUint(evt.Pubkey), evt)
}

func (dman *DataManager) GetFollowListLocal(pubkey64bit uint64) *schema.Np2pEvent {
if val, ok := dman.FollowListEvtMap.Load(pubkey64bit); ok {
return val.(*schema.Np2pEvent)
}
return nil
}
21 changes: 19 additions & 2 deletions core/message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
const (
KIND_EVT_PROFILE = 0
KIND_EVT_POST = 1
KIND_EVT_FOLLOW_LIST = 3
KIND_EVT_REACTION = 7
KIND_REQ_PROFILE = KIND_EVT_PROFILE
KIND_REQ_POST = KIND_EVT_POST
KIND_REQ_FOLLOW_LIST = KIND_EVT_FOLLOW_LIST
KIND_REQ_SHARE_EVT_DATA = 40000
)

Expand Down Expand Up @@ -63,6 +65,13 @@ func (mm *MessageManager) handleRecvMsgBcastEvt(src mesh.PeerName, pkt *schema.N
go mm.UnicastProfileReq(shortId & 0x0000ffffffffffff)
}
}
case KIND_EVT_FOLLOW_LIST:
mm.DataMan.StoreFollowList(evt)
if evt.Pubkey == *glo_val.SelfPubkey && (glo_val.CurrentFollowListEvt == nil || glo_val.CurrentFollowListEvt.Created_at < evt.Created_at) {
// this route works only when recovery
//glo_val.ProfileMyOwn = prof
glo_val.CurrentProfileEvt = evt
}
case KIND_EVT_REACTION:
// do nothing
default:
Expand Down Expand Up @@ -114,6 +123,8 @@ func (mm *MessageManager) handleRecvMsgUnicast(src mesh.PeerName, pkt *schema.Np
mm.DataMan.StoreProfile(evt)
case KIND_EVT_POST: // response of KIND_REQ_SHARE_EVT_DATA
// do nothing
case KIND_EVT_FOLLOW_LIST: // response of KIND_REQ_FOLLOW_LIST
mm.DataMan.StoreFollowList(evt) // store received follow list data
case KIND_EVT_REACTION:
// do nothing
default:
Expand Down Expand Up @@ -172,15 +183,21 @@ func (mm *MessageManager) BcastProfile(evt *schema.Np2pEvent) {
func (mm *MessageManager) UnicastProfileReq(pubkey64bit uint64) {
reqs := []*schema.Np2pReq{schema.NewNp2pReq(KIND_REQ_PROFILE, nil)}
pkt := schema.NewNp2pPacket(nil, &reqs)
mm.SendMsgUnicast(mesh.PeerName(pubkey64bit), pkt)
mm.SendMsgUnicast(mesh.PeerName(pubkey64bit&0x0000ffffffffffff), pkt)
}

func (mm *MessageManager) UnicastFollowListReq(pubkey64bit uint64) {
reqs := []*schema.Np2pReq{schema.NewNp2pReq(KIND_REQ_FOLLOW_LIST, nil)}
pkt := schema.NewNp2pPacket(nil, &reqs)
mm.SendMsgUnicast(mesh.PeerName(pubkey64bit&0x0000ffffffffffff), pkt)
}

// used for response of profile request
func (mm *MessageManager) UnicastOwnProfile(dest uint64) {
if glo_val.CurrentProfileEvt != nil {
// send latest profile data
events := []*schema.Np2pEvent{glo_val.CurrentProfileEvt}
mm.SendMsgUnicast(mesh.PeerName(dest), schema.NewNp2pPacket(&events, nil))
mm.SendMsgUnicast(mesh.PeerName(dest&0x0000ffffffffffff), schema.NewNp2pPacket(&events, nil))
}
}

Expand Down
1 change: 1 addition & 0 deletions glo_val/glo_val.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ var SelfPubkey64bit uint64 // initialized at creation of Np2pPe
// var Nickname *string // initialized at server launch
// var ProfileMyOwn *schema.Np2pProfile
var CurrentProfileEvt *schema.Np2pEvent
var CurrentFollowListEvt *schema.Np2pEvent

var IsEnabledSSL bool = false

Expand Down

0 comments on commit 53ac8ef

Please sign in to comment.