Skip to content

Commit

Permalink
添加定时任务
Browse files Browse the repository at this point in the history
  • Loading branch information
oldfritter committed Sep 8, 2019
1 parent a28ca8f commit b9c7878
Show file tree
Hide file tree
Showing 14 changed files with 477 additions and 21 deletions.
4 changes: 4 additions & 0 deletions config/aws_s3.yml.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
S3_BACKUP_BUCKET:
AWS_ACCESS_KEY_ID:
AWS_SECRET_ACCESS_KEY:
AWS_REGION:
2 changes: 2 additions & 0 deletions config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type Env struct {
AppName string `yaml:"app_name"`
LicenseKey string `yaml:"license_key"`
} `yaml:"newrelic"`

Schedules []string `yaml:"schedules"`
}

var CurrentEnv Env
Expand Down
3 changes: 3 additions & 0 deletions config/qiniu.yml.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
access_key: P4i11d2V8IpsBIkvQPlmUWbt-72UY-fLdY9WaWHa
secret_key: wgU5keLT1nyNQ4cIUIn9XDwtOP2oZvOMJyxhkYS-
backup_bucket: backup
4 changes: 4 additions & 0 deletions models/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func InitAllMarkets(db *utils.GormDB) {
db.Where("visible = ?", true).Find(&Markets)
}

func FindAllMarket() []Market {
return Markets
}

// Exchange
func (assignment *Market) MatchingExchange() string {
return utils.AmqpGlobalConfig.Exchange.Matching["key"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
package backup
package tasks

import (
"fmt"
"time"

"github.com/oldfritter/goDCE/config"
. "github.com/oldfritter/goDCE/models"
"github.com/oldfritter/goDCE/utils"
)

func BackupAccountVersions() {
config.InitEnv()
utils.InitMainDB()
utils.InitBackupDB()
AutoMigrations()

mainDB := utils.MainDbBegin()
defer mainDB.DbRollback()
var first, last AccountVersion
Expand Down Expand Up @@ -58,10 +52,6 @@ func BackupAccountVersions() {
<-quit
fmt.Println("quiting...")
time.Sleep(10 * time.Second)
// defer close(c)
// defer close(quit)
utils.CloseMainDB()
utils.CloseBackupDB()
return

}
Expand Down
96 changes: 96 additions & 0 deletions schedules/backup/tasks/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package tasks

import (
"fmt"
"os/exec"
"path/filepath"
"time"

"github.com/oldfritter/goDCE/utils"
)

type MyPutRet struct {
Key string
Hash string
Fsize int
Bucket string
Name string
}

var (
logNames = []string{"api", "workers", "schedule"}
)

func UploadLogFileToQiniu() {
UploadLogFileToQiniuByDay(time.Now().Add(-time.Hour * 24))
UploadLogFileToQiniuByDay(time.Now())
}

func UploadLogFileToS3() {
UploadLogFileToS3ByDay(time.Now().Add(-time.Hour * 24))
UploadLogFileToS3ByDay(time.Now())
}

func UploadLogFileToQiniuByDay(day time.Time) {
utils.InitQiniuConfig()
for _, logName := range logNames {
gzFile := "/tmp/panama/" + logName + day.Format("2006-01-02") + ".tar.gz"

name, _ := exec.Command("sh", "-c", "hostname").Output()
hostname := string(name)

exec.Command("sh", "-c", "mkdir -p /tmp/panama/").Output()
exec.Command("sh", "-c", "tar -czvf "+gzFile+" "+"logs/"+logName+day.Format("2006-01-02")+".log").Output()

key := "logs/panama/" + day.Format("01/02") + "/" + logName + "/" + hostname + ".tar.gz"

err := utils.UploadFileToQiniu(utils.QiniuConfig["backup_bucket"], key, gzFile)
if err != nil {
fmt.Println("err: ", err)
}
exec.Command("sh", "-c", "rm -rf "+gzFile).Output()

}
}

func UploadLogFileToS3ByDay(day time.Time) {
utils.InitAwsS3Config()
for _, logName := range logNames {
gzFile := "/tmp/panama/" + logName + day.Format("2006-01-02") + ".tar.gz"

name, _ := exec.Command("sh", "-c", "hostname").Output()
hostname := string(name)

exec.Command("sh", "-c", "mkdir -p /tmp/panama/").Output()
exec.Command("sh", "-c", "tar -czvf "+gzFile+" "+"logs/"+logName+day.Format("2006-01-02")+".log").Output()

key := "logs/panama/" + day.Format("01/02") + "/" + logName + "/" + hostname + ".tar.gz"

err := utils.UploadFileToS3(utils.S3Config["S3_BACKUP_BUCKET"], key, gzFile)
if err != nil {
fmt.Println("err: ", err)
}
exec.Command("sh", "-c", "rm -rf "+gzFile).Output()

}
}
func BackupLogFiles() {
for _, logName := range logNames {
a, _ := filepath.Abs("logs/" + logName + ".log")
b, _ := filepath.Abs("logs/" + logName + time.Now().Format("2006-01-02") + ".log")
exec.Command("sh", "-c", "cat "+fmt.Sprintf(a)+" >> "+fmt.Sprintf(b)).Output()
exec.Command("sh", "-c", "echo '\n' > "+fmt.Sprintf(a)).Output()
}
}

func CleanLogs() {
for _, logName := range logNames {
day := 2
for day < 10 {
str := time.Now().Add(-time.Hour * 24 * time.Duration(day)).Format("2006-01-02")
b, _ := filepath.Abs("logs/" + logName + str + ".log")
exec.Command("sh", "-c", "rm -rf "+fmt.Sprintf(b)).Output()
day += 1
}
}
}
18 changes: 18 additions & 0 deletions schedules/backup/tasks/tokens.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package tasks

import (
"time"

. "github.com/oldfritter/goDCE/models"
"github.com/oldfritter/goDCE/utils"
)

func CleanTokens() {
utils.InitMainDB()
db := utils.MainDbBegin()
defer db.DbRollback()

db.Where("expire_at < ?", time.Now().Add(-time.Hour*8)).Delete(Token{})
db.DbCommit()
utils.CloseMainDB()
}
9 changes: 0 additions & 9 deletions schedules/backup_account_version.go

This file was deleted.

42 changes: 42 additions & 0 deletions schedules/kLine/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package kLine

import (
"encoding/json"
"fmt"
"time"

. "github.com/oldfritter/goDCE/models"
"github.com/oldfritter/goDCE/utils"
"github.com/streadway/amqp"
)

func CreateLatestKLine() {

markets := FindAllMarket()
for _, market := range markets {
periods := []int64{1, 5, 15, 30, 60, 120, 240, 360, 720, 1440, 4320, 10080}
for _, period := range periods {
payload := struct {
MarketId int `json:"market_id"`
Timestamp int64 `json:"timestamp"`
Period int64 `json:"period"`
DataSource string `json:"data_source"`
}{
MarketId: market.Id,
Timestamp: time.Now().Unix(),
Period: period,
DataSource: "db",
}
b, err := json.Marshal(payload)
if err != nil {
fmt.Println("error:", err)
}
err = utils.PublishMessageWithRouteKey("goDCE.default", "goDCE.k", "text/plain", &b, amqp.Table{}, amqp.Persistent)
if err != nil {
fmt.Println("{ error:", err, "}")
panic(err)
}
}
}

}
76 changes: 76 additions & 0 deletions schedules/order/waitingCheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package order

import (
"encoding/json"
"fmt"
"time"

"github.com/gomodule/redigo/redis"
. "github.com/oldfritter/goDCE/models"
"github.com/oldfritter/goDCE/utils"
)

func WaitingOrderCheck() {
db := utils.MainDbBegin()
defer db.DbRollback()

for _, market := range Markets {
ordersPerMarket(db, &market)
}
db.DbCommit()
}

func ordersPerMarket(db *utils.GormDB, market *Market) {
dataRedis := utils.GetRedisConn("data")
defer dataRedis.Close()

var orderBook struct {
AskIds []int `json:"ask_ids"`
BidIds []int `json:"bid_ids"`
}
key := "goDCE:order_book:" + (*market).Code
values, _ := redis.String(dataRedis.Do("GET", key))
json.Unmarshal([]byte(values), &orderBook)

var orders []Order
if !db.Where("id not in (?)", orderBook.AskIds).
Where("type = ?", "OrderAsk").
Where("market_id = ?", (*market).Code).
Where("state = ?", 100).
Where("created_at < ?", time.Now().Add(-time.Second*10)).
Find(&orders).RecordNotFound() {
var ids []int
for _, order := range orders {
ids = append(ids, order.Id)
}
fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "--WaitingOrderCheck orders: ", ids)
}
// for _, order := range orders {
// if order.CreatedAt.Before(time.Now().Add(-time.Hour * 24)) {
// order.State = 0
// db.Save(&order)
// } else {
// order.PushMessageToMatching("submit")
// }
// }
if !db.Where("id not in (?)", orderBook.BidIds).
Where("type = ?", "OrderBid").
Where("market_id = ?", (*market).Code).
Where("state = ?", 100).
Where("created_at < ?", time.Now().Add(-time.Second*10)).
Find(&orders).RecordNotFound() {
var ids []int
for _, order := range orders {
ids = append(ids, order.Id)
}
fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "--WaitingOrderCheck orders: ", ids)
}
// for _, order := range orders {
// if order.CreatedAt.Before(time.Now().Add(-time.Hour * 24)) {
// order.State = 0
// db.Save(&order)
// } else {
// order.PushMessageToMatching("submit")
// }
// }
}
Loading

0 comments on commit b9c7878

Please sign in to comment.