Skip to content

Commit

Permalink
Revert "Dev (#71)"
Browse files Browse the repository at this point in the history
This reverts commit ee91271.
  • Loading branch information
sandyskies committed Nov 16, 2018
1 parent 726fd7f commit 05d4a5b
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 6 deletions.
13 changes: 8 additions & 5 deletions tars/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ func (c *AdapterProxy) ParsePackage(buff []byte) (int, int) {

// Recv : Recover read channel when closed for timeout
func (c *AdapterProxy) Recv(pkg []byte) {
defer func() {
// TODO readCh has a certain probability to be closed after the load, and we need to recover
// Maybe there is a better way
if err := recover(); err != nil {
TLOG.Error("recv pkg painc:", err)
}
}()
packet := requestf.ResponsePacket{}
err := packet.ReadFrom(codec.NewReader(pkg))
if err != nil {
Expand All @@ -65,11 +72,7 @@ func (c *AdapterProxy) Recv(pkg []byte) {
if ok {
ch := chIF.(chan *requestf.ResponsePacket)
TLOG.Debug("IN:", packet)
select {
case ch <- &packet:
default:
close(ch)
}
ch <- &packet
} else {
TLOG.Error("timeout resp,drop it:", packet.IRequestId)
}
Expand Down
3 changes: 2 additions & 1 deletion tars/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ func (obj *ObjectProxy) Invoke(ctx context.Context, msg *Message, timeout time.D
}
msg.Adp = adp
atomic.AddInt32(&obj.queueLen, 1)
readCh := make(chan *requestf.ResponsePacket)
readCh := make(chan *requestf.ResponsePacket, 1)
adp.resp.Store(msg.Req.IRequestId, readCh)
defer func() {
checkPanic()
atomic.AddInt32(&obj.queueLen, -1)
adp.resp.Delete(msg.Req.IRequestId)
close(readCh)
}()
if err := adp.Send(msg.Req); err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions tars/util/rogger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ func init() {
currDateDay = now.Format("20060102")
go func() {
tm := time.NewTimer(time.Second)
if err := recover(); err != nil { // avoid timer panic
}
for {
now := time.Now()
d := time.Second - time.Duration(now.Nanosecond())
Expand Down

0 comments on commit 05d4a5b

Please sign in to comment.