Skip to content

Commit

Permalink
update readMsg logic
Browse files Browse the repository at this point in the history
  • Loading branch information
showurl committed Feb 19, 2023
1 parent 7502a1a commit 8ba8a9f
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 10 deletions.
3 changes: 3 additions & 0 deletions app/msg/internal/logic/pushMsgListLogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ func (l *PushMsgListLogic) offlinePushMsgList(list *pb.MsgDataList, userIds []st
func (l *PushMsgListLogic) batchFindAndPushOfflineMsgList(ctx context.Context, listMap map[string]*pb.MsgDataList) {
for convId, msgDataList := range listMap {
for _, data := range msgDataList.MsgDataList {
if !data.GetOptions().GetOfflinePush() {
continue
}
if data.IsSingleConv() {
// 单聊
receiver := data.ReceiverUid()
Expand Down
23 changes: 13 additions & 10 deletions app/msg/internal/logic/readMsgLogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ type ReadMsgTask struct {
errChan chan error
}

func (l *ReadMsgLogic) readMsgTask() {
ticker := time.NewTicker(l.svcCtx.ConfigMgr.ReadMsgTaskInterval(l.ctx))
func (l *ReadMsgLogic) readMsgTask(size int) {
interval := l.svcCtx.ConfigMgr.ReadMsgTaskInterval(l.ctx)
l.Infof("readMsgTask interval: %d, size: %d", interval/time.Millisecond, size)
ticker := time.NewTicker(interval)
for {
select {
case <-ticker.C:
notices, errChans := l.popReadMsgTask(l.svcCtx.ConfigMgr.ReadMsgTaskBatchSize(l.ctx))
notices, errChans := l.popReadMsgTask(size)
if len(notices) == 0 {
continue
}
Expand All @@ -45,13 +47,14 @@ var singleReadMsgLogic *ReadMsgLogic
func NewReadMsgLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ReadMsgLogic {
ctx = context.Background()
if singleReadMsgLogic == nil {
size := svcCtx.ConfigMgr.ReadMsgTaskBatchSize(ctx)
singleReadMsgLogic = &ReadMsgLogic{
ctx: ctx,
svcCtx: svcCtx,
readMsgTasks: make(chan *ReadMsgTask, 0),
readMsgTasks: make(chan *ReadMsgTask, size),
Logger: logx.WithContext(ctx),
}
go singleReadMsgLogic.readMsgTask()
go singleReadMsgLogic.readMsgTask(size)
}
return singleReadMsgLogic
}
Expand Down Expand Up @@ -107,20 +110,20 @@ func (l *ReadMsgLogic) readMsg(in *pb.ReadMsgReq) (*pb.ReadMsgResp, error) {
}

func (l *ReadMsgLogic) popReadMsgTask(num int) ([]*noticemodel.Notice, []chan error) {
if len(l.readMsgTasks) == 0 {
length := len(l.readMsgTasks)
//l.Debugf("popReadMsgTask length: %d, num: %d", length, num)
if length == 0 {
return nil, nil
}
var (
tasks []*ReadMsgTask
notices []*noticemodel.Notice
errChans []chan error
)
if len(l.readMsgTasks) < num {
num = len(l.readMsgTasks)
if length < num {
num = length
}
for i := 0; i < num; i++ {
task := <-l.readMsgTasks
tasks = append(tasks, task)
notice := &noticemodel.Notice{
ConvId: pb.HiddenConvId(task.readMsgReq.ConvId),
Options: noticemodel.NoticeOption{
Expand Down
15 changes: 15 additions & 0 deletions common/xtdmq/tdmq_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/zeromicro/go-zero/core/logx"
"go.opentelemetry.io/otel/propagation"
oteltrace "go.opentelemetry.io/otel/trace"
"math"
"strconv"
"time"
)
Expand Down Expand Up @@ -46,6 +47,7 @@ type TDMQProducer struct {
ProducerConfig TDMQProducerConfig
producer pulsar.Producer
client pulsar.Client
produceTimes int
}

func NewTDMQProducer(config TDMQConfig, producerConfig TDMQProducerConfig) *TDMQProducer {
Expand Down Expand Up @@ -75,6 +77,11 @@ func (p *TDMQProducer) init() {
Topic: p.ProducerConfig.TopicName,
Name: p.ProducerConfig.GetProducerName(),
SendTimeout: time.Duration(p.ProducerConfig.SendTimeout) * time.Millisecond,
MessageRouter: func(message *pulsar.ProducerMessage, metadata pulsar.TopicMetadata) int {
partitions := metadata.NumPartitions()
i := p.incrTimes() % int(partitions)
return i
},
})
if err != nil {
logx.Errorf("Could not instantiate Pulsar producer: %v", err)
Expand All @@ -83,6 +90,14 @@ func (p *TDMQProducer) init() {
p.producer = producer
}

func (p *TDMQProducer) incrTimes() int {
if p.produceTimes >= math.MaxInt {
p.produceTimes = 0
}
p.produceTimes++
return p.produceTimes
}

func (p *TDMQProducer) Produce(
ctx context.Context,
key string,
Expand Down

0 comments on commit 8ba8a9f

Please sign in to comment.