Skip to content

Commit

Permalink
cache api candles
Browse files Browse the repository at this point in the history
  • Loading branch information
buck54321 committed Jul 20, 2023
1 parent 9ee16c6 commit 9970db5
Show file tree
Hide file tree
Showing 12 changed files with 265 additions and 19 deletions.
5 changes: 2 additions & 3 deletions client/orderbook/orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync/atomic"

"decred.org/dcrdex/dex"
"decred.org/dcrdex/dex/generics"
"decred.org/dcrdex/dex/msgjson"
"decred.org/dcrdex/dex/order"
)
Expand Down Expand Up @@ -667,9 +668,7 @@ func (ob *OrderBook) AddRecentMatches(matches [][2]int64, ts uint64) []*MatchSum
}

// Put the newest first.
for i, j := 0, len(newMatches)-1; i < j; i, j = i+1, j-1 {
newMatches[i], newMatches[j] = newMatches[j], newMatches[i]
}
generics.ReverseSlice(newMatches)

ob.matchSummaryMtx.Lock()
defer ob.matchSummaryMtx.Unlock()
Expand Down
26 changes: 25 additions & 1 deletion dex/candles/candles.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package candles
import (
"time"

"decred.org/dcrdex/dex/generics"
"decred.org/dcrdex/dex/msgjson"
)

Expand Down Expand Up @@ -68,7 +69,7 @@ func (c *Cache) Add(candle *Candle) {
return
}
c.Candles = append(c.Candles, *candle)
c.cursor = sz // len(c.candles) - 1
c.cursor = sz // len(c.Candles) - 1
}

func (c *Cache) Reset() {
Expand Down Expand Up @@ -181,6 +182,29 @@ func (c *Cache) Last() *Candle {
return &c.Candles[c.cursor]
}

func (c *Cache) CompletedCandlesSince(lastStoredEndStamp uint64) (cs []*Candle) {
// Don't include partial candles. The current period's start stamp is the
// end stamp of the last completed candle.
currentIdx := uint64(time.Now().UnixMilli()) / c.BinSize
lastStoredIdx := lastStoredEndStamp / c.BinSize

sz := len(c.Candles)
for i := 0; i < sz; i++ {
// iterate backwards
candle := &c.Candles[(c.cursor+sz-i)%sz]
epochIdx := candle.EndStamp / c.BinSize
if epochIdx >= currentIdx {
continue
}
if epochIdx <= lastStoredIdx {
break
}
cs = append(cs, candle)
}
generics.ReverseSlice(cs)
return
}

// combineCandles attempts to add the candidate candle to the target candle
// in-place, if they're in the same bin, otherwise returns false.
func (c *Cache) combineCandles(target, candidate *Candle) bool {
Expand Down
10 changes: 10 additions & 0 deletions dex/generics/generics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// This code is available on the terms of the project LICENSE.md file,
// also available online at https://blueoakcouncil.org/license/1.0.0.

package generics

func ReverseSlice[T any](s []T) {
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
s[i], s[j] = s[j], s[i]
}
}
5 changes: 2 additions & 3 deletions dex/networks/zec/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"time"

"decred.org/dcrdex/dex/generics"
"github.com/btcsuite/btcd/wire"
)

Expand Down Expand Up @@ -119,8 +120,6 @@ func readInternalByteOrder(r io.Reader, b []byte) error {
return err
}
// Reverse the bytes
for i, j := 0, len(b)-1; i < j; i, j = i+1, j-1 {
b[i], b[j] = b[j], b[i]
}
generics.ReverseSlice(b)
return nil
}
41 changes: 35 additions & 6 deletions server/apidata/apidata.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var (
// caches at startup.
type DBSource interface {
LoadEpochStats(base, quote uint32, caches []*candles.Cache) error
LastCandleEndStamp(base, quote uint32, candleDur uint64) (uint64, error)
InsertCandles(base, quote uint32, dur uint64, cs []*candles.Candle) error
}

// MarketSource is a source of market information. Markets are added after
Expand All @@ -43,6 +45,11 @@ type BookSource interface {
Book(mktName string) (*msgjson.OrderBook, error)
}

type cacheWithStoredTime struct {
*candles.Cache
lastStoredEndStamp uint64 // protected by DataAPI.cacheMtx
}

// DataAPI is a data API backend.
type DataAPI struct {
db DBSource
Expand All @@ -53,7 +60,7 @@ type DataAPI struct {
spots map[string]json.RawMessage

cacheMtx sync.RWMutex
marketCaches map[string]map[uint64]*candles.Cache
marketCaches map[string]map[uint64]*cacheWithStoredTime
}

// NewDataAPI is the constructor for a new DataAPI.
Expand All @@ -62,7 +69,7 @@ func NewDataAPI(dbSrc DBSource) *DataAPI {
db: dbSrc,
epochDurations: make(map[string]uint64),
spots: make(map[string]json.RawMessage),
marketCaches: make(map[string]map[uint64]*candles.Cache),
marketCaches: make(map[string]map[uint64]*cacheWithStoredTime),
}

if atomic.CompareAndSwapUint32(&started, 0, 1) {
Expand All @@ -81,18 +88,25 @@ func (s *DataAPI) AddMarketSource(mkt MarketSource) error {
}
epochDur := mkt.EpochDuration()
s.epochDurations[mktName] = epochDur
binCaches := make(map[uint64]*candles.Cache, len(binSizes)+1)
s.marketCaches[mktName] = binCaches
binCaches := make(map[uint64]*cacheWithStoredTime, len(binSizes)+1)
cacheList := make([]*candles.Cache, 0, len(binSizes)+1)
for _, binSize := range append([]uint64{epochDur}, binSizes...) {
cache := candles.NewCache(candles.CacheSize, binSize)
lastCandleEndStamp, err := s.db.LastCandleEndStamp(mkt.Base(), mkt.Quote(), cache.BinSize)
if err != nil {
return fmt.Errorf("LastCandleEndStamp: %w", err)
}
c := &cacheWithStoredTime{cache, lastCandleEndStamp}
cacheList = append(cacheList, cache)
binCaches[binSize] = cache
binCaches[binSize] = c
}
err = s.db.LoadEpochStats(mkt.Base(), mkt.Quote(), cacheList)
if err != nil {
return err
}
s.cacheMtx.Lock()
s.marketCaches[mktName] = binCaches
s.cacheMtx.Unlock()
return nil
}

Expand Down Expand Up @@ -120,7 +134,7 @@ func (s *DataAPI) ReportEpoch(base, quote uint32, epochIdx uint64, stats *matche
epochDur := s.epochDurations[mktName]
startStamp := epochIdx * epochDur
endStamp := startStamp + epochDur
var cache5min *candles.Cache
var cache5min *cacheWithStoredTime
const fiveMins = uint64(time.Minute * 5 / time.Millisecond)
candle := &candles.Candle{
StartStamp: startStamp,
Expand All @@ -137,6 +151,21 @@ func (s *DataAPI) ReportEpoch(base, quote uint32, epochIdx uint64, stats *matche
cache5min = cache
}
cache.Add(candle)

// Check if any candles need to be inserted.
// Don't insert epoch candles.
if cache.BinSize == epochDur {
continue
}

newCandles := cache.CompletedCandlesSince(cache.lastStoredEndStamp)
if len(newCandles) == 0 {
continue
}
if err := s.db.InsertCandles(base, quote, cache.BinSize, newCandles); err != nil {
return 0, 0, 0, 0, err
}
cache.lastStoredEndStamp = newCandles[len(newCandles)-1].EndStamp
}
if cache5min == nil {
return 0, 0, 0, 0, fmt.Errorf("no 5 minute cache")
Expand Down
8 changes: 8 additions & 0 deletions server/apidata/apidata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ func (db *TDBSource) LoadEpochStats(base, quote uint32, caches []*candles.Cache)
return db.loadEpochErr
}

func (db *TDBSource) LastCandleEndStamp(base, quote uint32, candleDur uint64) (uint64, error) {
return 0, nil
}

func (db *TDBSource) InsertCandles(base, quote uint32, dur uint64, cs []*candles.Candle) error {
return nil
}

type TBookSource struct {
book *msgjson.OrderBook
}
Expand Down
125 changes: 120 additions & 5 deletions server/db/driver/pg/epochs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"database/sql/driver"
"errors"
"fmt"
"math"
"time"

"decred.org/dcrdex/dex/candles"
Expand Down Expand Up @@ -112,13 +113,33 @@ func (a *Archiver) LoadEpochStats(base, quote uint32, caches []*candles.Cache) e
ctx, cancel := context.WithTimeout(a.ctx, a.queryTimeout)
defer cancel()

var oldestNeeded uint64 = math.MaxUint64
sinceCaches := make(map[uint64]*candles.Cache, 0) // maps oldest end stamp
now := uint64(time.Now().UnixMilli())
for _, cache := range caches {
since := now - (cache.BinSize * candles.CacheSize)
since = since - since%cache.BinSize // truncate to first end stamp of the epoch
if err = a.loadCandles(base, quote, cache, since); err != nil {
return fmt.Errorf("loadCandles: %w", err)
}
// If we have candles, move our since value up to the next expected epoch stamp.
if len(cache.Candles) > 0 {
idx := cache.Last().EndStamp / cache.BinSize
since = (idx + 1) * cache.BinSize
}
if since < oldestNeeded {
oldestNeeded = since
}
sinceCaches[since] = cache
}

tstart := time.Now()
defer func() { log.Debugf("select epoch candles in: %v", time.Since(tstart)) }()

stmt := fmt.Sprintf(internal.SelectEpochCandles, epochReportsTableName)
rows, err := a.db.QueryContext(ctx, stmt, 0)
rows, err := a.db.QueryContext(ctx, stmt, oldestNeeded) // +1 because candles aren't stored until the end stamp is surpassed.
if err != nil {
return err
return fmt.Errorf("SelectEpochCandles: %w", err)
}

defer rows.Close()
Expand All @@ -127,7 +148,7 @@ func (a *Archiver) LoadEpochStats(base, quote uint32, caches []*candles.Cache) e
for rows.Next() {
err = rows.Scan(&endStamp, &epochDur, &matchVol, &quoteVol, &highRate, &lowRate, &startRate, &endRate)
if err != nil {
return err
return fmt.Errorf("Scan: %w", err)
}
candle := &candles.Candle{
StartStamp: uint64(endStamp - epochDur),
Expand All @@ -139,10 +160,104 @@ func (a *Archiver) LoadEpochStats(base, quote uint32, caches []*candles.Cache) e
StartRate: uint64(startRate),
EndRate: uint64(endRate),
}
for _, set := range caches {
set.Add(candle)
for since, cache := range sinceCaches {
if uint64(endStamp) > since {
cache.Add(candle)
}
}
}

return rows.Err()
}

func (a *Archiver) LastCandleEndStamp(base, quote uint32, candleDur uint64) (uint64, error) {
marketSchema, err := a.marketSchema(base, quote)
if err != nil {
return 0, err
}
tableName := fullCandlesTableName(a.dbName, marketSchema)

ctx, cancel := context.WithTimeout(a.ctx, a.queryTimeout)
defer cancel()

stmt := fmt.Sprintf(internal.SelectLastEndStamp, tableName)
row := a.db.QueryRowContext(ctx, stmt, candleDur)
var endStamp fastUint64
if err = row.Scan(&endStamp); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return 0, nil
}
return 0, err
}
return uint64(endStamp), nil
}

func (a *Archiver) InsertCandles(base, quote uint32, dur uint64, cs []*candles.Candle) error {
marketSchema, err := a.marketSchema(base, quote)
if err != nil {
return err
}
tableName := fullCandlesTableName(a.dbName, marketSchema)
stmt := fmt.Sprintf(internal.InsertCandle, tableName)

insert := func(c *candles.Candle) error {
ctx, cancel := context.WithTimeout(a.ctx, a.queryTimeout)
defer cancel()

_, err = a.db.ExecContext(ctx, stmt,
c.EndStamp, dur, c.MatchVolume,
c.QuoteVolume, c.HighRate, c.LowRate, c.StartRate, c.EndRate,
)
if err != nil {
a.fatalBackendErr(err)
return err
}
return nil
}

for _, c := range cs {
if err = insert(c); err != nil {
return err
}
}
return nil
}

func (a *Archiver) loadCandles(base, quote uint32, cache *candles.Cache, since uint64) error {
marketSchema, err := a.marketSchema(base, quote)
if err != nil {
return err
}
tableName := fullCandlesTableName(a.dbName, marketSchema)
stmt := fmt.Sprintf(internal.SelectCandles, tableName)

ctx, cancel := context.WithTimeout(a.ctx, a.queryTimeout)
defer cancel()

dur := cache.BinSize

rows, err := a.db.QueryContext(ctx, stmt, dur, since)

Check failure on line 239 in server/db/driver/pg/epochs.go

View workflow job for this annotation

GitHub Actions / Go CI (1.19)

rows.Err must be checked (rowserrcheck)

Check failure on line 239 in server/db/driver/pg/epochs.go

View workflow job for this annotation

GitHub Actions / Go CI (1.19)

rows.Err must be checked (rowserrcheck)
if err != nil {
return fmt.Errorf("QueryContext: %w", err)
}
defer rows.Close()

var endStamp, matchVol, quoteVol, highRate, lowRate, startRate, endRate fastUint64
for rows.Next() {
err = rows.Scan(&endStamp, &matchVol, &quoteVol, &highRate, &lowRate, &startRate, &endRate)
if err != nil {
return fmt.Errorf("Scan: %w", err)
}
cache.Add(&candles.Candle{
StartStamp: uint64(endStamp) - dur,
EndStamp: uint64(endStamp),
MatchVolume: uint64(matchVol),
QuoteVolume: uint64(quoteVol),
HighRate: uint64(highRate),
LowRate: uint64(lowRate),
StartRate: uint64(startRate),
EndRate: uint64(endRate),
})
}
return nil
}
Loading

0 comments on commit 9970db5

Please sign in to comment.