Skip to content

Commit

Permalink
Merge pull request #1659 from c9s/c9s/delayed-trade-sync
Browse files Browse the repository at this point in the history
FEATURE: [core] add syncBufferPeriod config and set default to -30 mins
  • Loading branch information
c9s authored Jun 19, 2024
2 parents 6b6bf2f + b2722d9 commit 26d0fed
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 11 deletions.
2 changes: 2 additions & 0 deletions pkg/bbgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ type EnvironmentConfig struct {
DisableMarketDataStore bool `json:"disableMarketDataStore"`

MaxSessionTradeBufferSize int `json:"maxSessionTradeBufferSize"`

SyncBufferPeriod *types.Duration `json:"syncBufferPeriod"`
}

type Config struct {
Expand Down
14 changes: 13 additions & 1 deletion pkg/bbgo/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func init() {
rand.Seed(time.Now().UnixNano())
}

var defaultSyncBufferPeriod = 30 * time.Minute

// IsBackTesting is a global variable that indicates the current environment is back-test or not.
var IsBackTesting = false

Expand Down Expand Up @@ -645,7 +647,17 @@ func (environ *Environment) syncSession(

log.Infof("syncing symbols %v from session %s", symbols, session.Name)

return environ.SyncService.SyncSessionSymbols(ctx, session.Exchange, syncStartTime, symbols...)
syncBufferPeriod := -defaultSyncBufferPeriod
if environ.environmentConfig.SyncBufferPeriod != nil {
syncBufferPeriod = -environ.environmentConfig.SyncBufferPeriod.Duration()
}

if syncBufferPeriod > 0 {
log.Warnf("syncBufferPeriod should be a negative number, given: %d", syncBufferPeriod)
}

syncEndTime := time.Now().Add(syncBufferPeriod)
return environ.SyncService.SyncSessionSymbols(ctx, session.Exchange, syncStartTime, syncEndTime, symbols...)
}

func (environ *Environment) ConfigureNotificationSystem(ctx context.Context, userConfig *Config) error {
Expand Down
7 changes: 5 additions & 2 deletions pkg/service/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ type OrderService struct {
DB *sqlx.DB
}

func (s *OrderService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
func (s *OrderService) Sync(
ctx context.Context, exchange types.Exchange, symbol string,
startTime, endTime time.Time,
) error {
isMargin, isFutures, isIsolated, isolatedSymbol := exchange2.GetSessionAttributes(exchange)
// override symbol if isolatedSymbol is not empty
if isIsolated && len(isolatedSymbol) > 0 {
Expand Down Expand Up @@ -77,7 +80,7 @@ func (s *OrderService) Sync(ctx context.Context, exchange types.Exchange, symbol
}

for _, sel := range tasks {
if err := sel.execute(ctx, s.DB, startTime); err != nil {
if err := sel.execute(ctx, s.DB, startTime, endTime); err != nil {
return err
}
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/service/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ type SyncService struct {

// SyncSessionSymbols syncs the trades from the given exchange session
func (s *SyncService) SyncSessionSymbols(
ctx context.Context, exchange types.Exchange, startTime time.Time, symbols ...string,
ctx context.Context, exchange types.Exchange,
startTime, endTime time.Time,
symbols ...string,
) error {
markets, err := cache.LoadExchangeMarketsWithCache(ctx, exchange)
if err != nil {
Expand All @@ -41,12 +43,12 @@ func (s *SyncService) SyncSessionSymbols(
}

log.Infof("syncing %s %s trades from %s...", exchange.Name(), symbol, startTime)
if err := s.TradeService.Sync(ctx, exchange, symbol, startTime); err != nil {
if err := s.TradeService.Sync(ctx, exchange, symbol, startTime, endTime); err != nil {
return err
}

log.Infof("syncing %s %s orders from %s...", exchange.Name(), symbol, startTime)
if err := s.OrderService.Sync(ctx, exchange, symbol, startTime); err != nil {
if err := s.OrderService.Sync(ctx, exchange, symbol, startTime, endTime); err != nil {
return err
}
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/service/sync_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ type SyncTask struct {
LogInsert bool
}

func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Time, args ...time.Time) error {
func (sel SyncTask) execute(
ctx context.Context,
db *sqlx.DB, startTime time.Time, endTimeArgs ...time.Time,
) error {
batchBufferRefVal := reflect.MakeSlice(reflect.SliceOf(reflect.TypeOf(sel.Type)), 0, sel.BatchInsertBuffer)

// query from db
Expand Down Expand Up @@ -84,8 +87,8 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim
startTime = lastRecordTime(sel, recordSliceRef, startTime)

endTime := time.Now()
if len(args) > 0 {
endTime = args[0]
if len(endTimeArgs) > 0 {
endTime = endTimeArgs[0]
}

// asset "" means all assets
Expand Down
8 changes: 6 additions & 2 deletions pkg/service/trade.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ func NewTradeService(db *sqlx.DB) *TradeService {
return &TradeService{db}
}

func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
func (s *TradeService) Sync(
ctx context.Context,
exchange types.Exchange, symbol string,
startTime, endTime time.Time,
) error {
isMargin, isFutures, isIsolated, isolatedSymbol := exchange2.GetSessionAttributes(exchange)
// override symbol if isolatedSymbol is not empty
if isIsolated && len(isolatedSymbol) > 0 {
Expand Down Expand Up @@ -106,7 +110,7 @@ func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol
}

for _, sel := range tasks {
if err := sel.execute(ctx, s.DB, startTime); err != nil {
if err := sel.execute(ctx, s.DB, startTime, endTime); err != nil {
return err
}
}
Expand Down

0 comments on commit 26d0fed

Please sign in to comment.