Skip to content

Commit

Permalink
Merge pull request #1955 from dedis/work-be1-etienne-stuart-vector-cl…
Browse files Browse the repository at this point in the history
…ock-fix

[BE1] process meeting messages and include unprocessed messages in rumor state answer
  • Loading branch information
sgueissa authored Jun 27, 2024
2 parents 81e32e9 + 1aab4b1 commit 4ea172c
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 25 deletions.
74 changes: 50 additions & 24 deletions be1-go/internal/database/sqlite/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,29 +465,17 @@ func (s *SQLite) GetAllRumorParams() ([]mrumor.ParamsRumor, error) {
if err != nil {
return nil, poperrors.NewDatabaseSelectErrorMsg("all rumors: %v", err)
}

defer rows.Close()

params := make([]mrumor.ParamsRumor, 0)
for rows.Next() {
var rumorID int
var sender string
var timestampByte []byte
if err = rows.Scan(&rumorID, &sender, &timestampByte); err != nil {
return nil, poperrors.NewDatabaseScanErrorMsg(err.Error())
}
if rumorID == myRumorID && sender == mySender {
continue
}
var timestamp mrumor.RumorTimestamp
if err = json.Unmarshal(timestampByte, &timestamp); err != nil {
return nil, poperrors.NewInternalServerError("failed to unmarshal timestamp: %v", err)
}

messages, err := s.GetMessagesFromRumorHelper(tx, rumorID, sender)
param, err := s.GetRumorParamsHelper(tx, rows, mySender, myRumorID)
if err != nil {
return nil, err
} else if param.SenderID == "" {
continue
}
param := newRumorParams(rumorID, sender, messages, timestamp)
params = append(params, param)
}

Expand All @@ -498,31 +486,69 @@ func (s *SQLite) GetAllRumorParams() ([]mrumor.ParamsRumor, error) {
return params, nil
}

func (s *SQLite) GetMessagesFromRumorHelper(tx *sql.Tx, rumorID int, sender string) (map[string][]mmessage.Message, error) {
func (s *SQLite) GetRumorParamsHelper(tx *sql.Tx, rows *sql.Rows,
mySender string, myRumorID int) (mrumor.ParamsRumor, error) {
var rumorID int
var sender string
var timestampByte []byte

if err := rows.Scan(&rumorID, &sender, &timestampByte); err != nil {
return mrumor.ParamsRumor{}, poperrors.NewDatabaseScanErrorMsg(err.Error())
}

if rumorID == myRumorID && sender == mySender {
return mrumor.ParamsRumor{}, nil
}

var timestamp mrumor.RumorTimestamp
if err := json.Unmarshal(timestampByte, &timestamp); err != nil {
return mrumor.ParamsRumor{}, poperrors.NewInternalServerError("failed to unmarshal timestamp: %v", err)
}

messages := make(map[string][]mmessage.Message)

args := []interface{}{true, sender, rumorID}

err := s.GetMessagesFromRumorHelper(tx, rumorID, args, selectRumorProcessedMessages, messages)
if err != nil {
return mrumor.ParamsRumor{}, err
}

args = []interface{}{sender, rumorID}

err = s.GetMessagesFromRumorHelper(tx, rumorID, args, selectRumorUnprocessedMessages, messages)
if err != nil {
return mrumor.ParamsRumor{}, err
}

return newRumorParams(rumorID, sender, messages, timestamp), nil
}

func (s *SQLite) GetMessagesFromRumorHelper(tx *sql.Tx, rumorID int, args []interface{},
query string, messages map[string][]mmessage.Message) error {

rows, err := tx.Query(selectRumorMessages, true, sender, rumorID)
rows, err := tx.Query(query, args...)
if err != nil {
return nil, poperrors.NewDatabaseSelectErrorMsg("messages from rumor %d: %v", rumorID, err)
return poperrors.NewDatabaseSelectErrorMsg("messages from rumor %d: %v", rumorID, err)
}
defer rows.Close()

messages := make(map[string][]mmessage.Message)
for rows.Next() {
var channelPath string
var messageByte []byte
if err = rows.Scan(&channelPath, &messageByte); err != nil {
return nil, poperrors.NewDatabaseScanErrorMsg(err.Error())
return poperrors.NewDatabaseScanErrorMsg(err.Error())
}
var msg mmessage.Message
if err = json.Unmarshal(messageByte, &msg); err != nil {
return nil, poperrors.NewInternalServerError("failed to unmarshal message of rumor : %v", err)
return poperrors.NewInternalServerError("failed to unmarshal message from rumor %d: %v", rumorID, err)
}
messages[channelPath] = append(messages[channelPath], msg)
}

if err = rows.Err(); err != nil {
return nil, poperrors.NewDatabaseIteratorErrorMsg(err.Error())
return poperrors.NewDatabaseIteratorErrorMsg(err.Error())
}

return messages, nil
return nil
}
11 changes: 10 additions & 1 deletion be1-go/internal/database/sqlite/sqlite_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ const (
WHERE sender = (SELECT publicKey FROM key WHERE channelPath = ?) AND rumorID = (SELECT max(ID) FROM rumor
WHERE sender = (SELECT publicKey FROM key WHERE channelPath = ?)))`

selectRumorMessages = `
selectRumorProcessedMessages = `
SELECT channelPath, message
FROM message JOIN channelMessage ON message.messageID = channelMessage.messageID
WHERE isBaseChannel = ? AND message.messageID IN
Expand All @@ -338,6 +338,15 @@ const (
WHERE sender = ?
AND rumorID = ?)`

selectRumorUnprocessedMessages = `
SELECT channelPath, message
FROM unprocessedMessage
WHERE unprocessedMessage.messageID IN
(SELECT messageID
FROM unprocessedMessageRumor
WHERE sender = ?
AND rumorID = ?)`

selectAllRumors = `SELECT ID, sender, timestamp FROM rumor `

selectMyRumorInfos = `SELECT max(ID), sender FROM rumor WHERE sender = (SELECT publicKey FROM key WHERE channelPath = ?)`
Expand Down
4 changes: 4 additions & 0 deletions be1-go/internal/handler/channel/lao/hlao/lao.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ func (h *Handler) Handle(channelPath string, msg mmessage.Message) error {
case channel.ElectionObject + "#" + channel.ElectionActionSetup:
storeMessage = false
err = h.handleElectionSetup(msg, channelPath)
case channel.MeetingObject + "#" + channel.MeetingActionCreate:
err = nil
case channel.MeetingObject + "#" + channel.MeetingActionState:
err = nil
default:
err = errors.NewInvalidMessageFieldError("failed to Handle %s#%s, invalid object#action", object, action)
}
Expand Down

0 comments on commit 4ea172c

Please sign in to comment.