Skip to content

Commit

Permalink
match checksum to separate mutex and add dc cancel tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
buck54321 committed Sep 17, 2024
1 parent ae2fbd0 commit 08b17cb
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 65 deletions.
75 changes: 57 additions & 18 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ type dexConnection struct {
// processed by a dex server.
inFlightOrders map[uint64]*InFlightOrder

// A map linking cancel order IDs to trade order IDs.
cancelsMtx sync.RWMutex
cancels map[order.OrderID]order.OrderID

blindCancelsMtx sync.Mutex
blindCancels map[order.OrderID]order.Preimage

Expand Down Expand Up @@ -253,6 +257,25 @@ func (dc *dexConnection) bondAssets() (map[uint32]*BondAsset, uint64) {
return bondAssets, cfg.BondExpiry
}

func (dc *dexConnection) registerCancelLink(cid, oid order.OrderID) {
dc.cancelsMtx.Lock()
dc.cancels[cid] = oid
dc.cancelsMtx.Unlock()
}

func (dc *dexConnection) deleteCancelLink(cid order.OrderID) {
dc.cancelsMtx.Lock()
delete(dc.cancels, cid)
dc.cancelsMtx.Unlock()
}

func (dc *dexConnection) cancelTradeID(cid order.OrderID) (order.OrderID, bool) {
dc.cancelsMtx.RLock()
defer dc.cancelsMtx.RUnlock()
oid, found := dc.cancels[cid]
return oid, found
}

// marketConfig is the market's configuration, as returned by the server in the
// 'config' response.
func (dc *dexConnection) marketConfig(mktID string) *msgjson.Market {
Expand Down Expand Up @@ -584,10 +607,12 @@ func (dc *dexConnection) findOrder(oid order.OrderID) (tracker *trackedTrade, pr
if tracker, found := dc.trades[oid]; found {
return tracker, tracker.preImg, false
}
// Search the cancel order IDs.
for _, tracker := range dc.trades {
if tracker.cancel != nil && tracker.cancel.ID() == oid {
return tracker, tracker.cancel.preImg, true

if tid, found := dc.cancelTradeID(oid); found {
if tracker, found := dc.trades[tid]; found {
return tracker, tracker.preImg, true
} else {
dc.log.Errorf("Did not find trade for cancel order ID %s", oid)
}
}
return
Expand Down Expand Up @@ -8087,6 +8112,7 @@ func (c *Core) newDEXConnection(acctInfo *db.AccountInfo, flag connectDEXFlag) (
ticker: newDexTicker(defaultTickInterval), // updated when server config obtained
books: make(map[string]*bookie),
trades: make(map[order.OrderID]*trackedTrade),
cancels: make(map[order.OrderID]order.OrderID),
inFlightOrders: make(map[uint64]*InFlightOrder),
blindCancels: make(map[order.OrderID]order.Preimage),
apiVer: -1,
Expand Down Expand Up @@ -8899,7 +8925,7 @@ func handlePreimageRequest(c *Core, dc *dexConnection, msg *msgjson.Message) err
}

if len(req.Commitment) != order.CommitmentSize {
return fmt.Errorf("received preimage request for %v with no corresponding order submission response.", oid)
return fmt.Errorf("received preimage request for %s with no corresponding order submission response", oid)
}

// See if we recognize that commitment, and if we do, just wait for the
Expand Down Expand Up @@ -8992,15 +9018,14 @@ func acceptCsum(tracker *trackedTrade, isCancel bool, commitChecksum dex.Bytes)
// Do not allow csum to be changed once it has been committed to
// (initialized to something other than `nil`) because it is probably a
// malicious behavior by the server.
tracker.mtx.Lock()
defer tracker.mtx.Unlock()

tracker.csumMtx.Lock()
defer tracker.csumMtx.Unlock()
if isCancel {
if tracker.cancel.csum == nil {
tracker.cancel.csum = commitChecksum
if tracker.cancelCsum == nil {
tracker.cancelCsum = commitChecksum
return true
}
return bytes.Equal(commitChecksum, tracker.cancel.csum)
return bytes.Equal(commitChecksum, tracker.cancelCsum)
}
if tracker.csum == nil {
tracker.csum = commitChecksum
Expand Down Expand Up @@ -10721,25 +10746,39 @@ func (c *Core) checkEpochResolution(host string, mktID string) {
}
currentEpoch := dc.marketEpoch(mktID, time.Now())
lastEpoch := currentEpoch - 1

// Short path if we're already resolved.
dc.epochMtx.RLock()
resolvedEpoch := dc.resolvedEpoch[mktID]
dc.epochMtx.RUnlock()
if lastEpoch == resolvedEpoch {
return
}

ts, inFlights := dc.marketTrades(mktID)
for _, ord := range inFlights {
if ord.Epoch == lastEpoch {
return
}
}
for _, t := range ts {
// Is this order from the last epoch and still not booked or executed?
if t.epochIdx() == lastEpoch && t.status() == order.OrderStatusEpoch {
return
}
if t.cancel != nil && t.cancelEpochIdx() == lastEpoch {
t.mtx.RLock()
matched := t.cancel.matches.taker != nil
t.mtx.RUnlock()
if !matched {
return
}
// Does this order have an in-flight cancel order that is not yet
// resolved?
t.mtx.RLock()
unresolvedCancel := t.cancel != nil && t.cancelEpochIdx() == lastEpoch && t.cancel.matches.taker == nil
t.mtx.RUnlock()
if unresolvedCancel {
return
}
}

// We don't have any unresolved orders or cancel orders from the last epoch.
// Just make sure that not other thread has resolved the epoch and then send
// the notification.
dc.epochMtx.Lock()
sendUpdate := lastEpoch > dc.resolvedEpoch[mktID]
dc.resolvedEpoch[mktID] = lastEpoch
Expand Down
Loading

0 comments on commit 08b17cb

Please sign in to comment.