Skip to content

Commit 5316679

Browse files
committed
opt: divide each thread and make them fetch task info individualy
1 parent 6f0b05a commit 5316679

File tree

6 files changed

+81
-27
lines changed

6 files changed

+81
-27
lines changed

api/api.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ import (
66
"log"
77
"net/http"
88
"strconv"
9+
"sync"
910
)
1011

1112
//ApiInfo Api请求配置
1213
type ApiInfo struct {
1314
TOKEN string
1415
APIURL string
1516
BASEPATH string
17+
Lock *sync.Mutex
1618
}
1719

1820
//APIResponse API响应结果
@@ -31,6 +33,11 @@ func (apiInfo *ApiInfo) GetTaskList(num int) string {
3133
return apiInfo.apiGet("getList?num=" + strconv.Itoa(num))
3234
}
3335

36+
//CancelUploadSession 取消分片上传
37+
func (apiInfo *ApiInfo) CancelUploadSession(url string) {
38+
apiInfo.apiDelete(url)
39+
}
40+
3441
//GetPolicy 获取上传策略详情
3542
func (apiInfo *ApiInfo) GetPolicy(id int) string {
3643
return apiInfo.apiGet("getPolicy?id=" + strconv.Itoa(id))
@@ -50,10 +57,38 @@ func (apiInfo *ApiInfo) SetSuccess(id int) string {
5057
return ""
5158
}
5259

60+
//SetError 设置为失败状态
5361
func (apiInfo *ApiInfo) SetError(id int) string {
5462
return apiInfo.apiGet("setError?id=" + strconv.Itoa(id))
5563
}
5664

65+
//apiDelete 发送DELETE请求
66+
67+
func (apiInfo *ApiInfo) apiDelete(controller string) string {
68+
client := &http.Client{}
69+
request, err := http.NewRequest("DELETE", controller, nil)
70+
if err != nil {
71+
log.Printf("[ERROR] Failed to create DELETE requetst, #%v ", err)
72+
}
73+
request.Header.Set("Authorization", "Bearer "+apiInfo.TOKEN)
74+
response, err := client.Do(request)
75+
if err != nil {
76+
log.Printf("[ERROR] Failed to send GET requetst, #%v ", err)
77+
return ""
78+
}
79+
defer response.Body.Close()
80+
if response.StatusCode == 200 || response.StatusCode == 204 {
81+
r, err := ioutil.ReadAll(response.Body)
82+
if err != nil {
83+
log.Printf("[ERROR] Failed to get GET requetst body, #%v ", err)
84+
}
85+
return string(r)
86+
} else {
87+
log.Printf("[ERROR] Failed to get respond, HTTP ERROR %v ", response.StatusCode)
88+
return ""
89+
}
90+
}
91+
5792
//apiGet 发送GET请求
5893
func (apiInfo *ApiInfo) apiGet(controller string) string {
5994
client := &http.Client{}

conf.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
token: 52121225
22
api: http://127.0.0.1/Queue
33
taskNum: 4
4-
threadNum: 4
4+
Duration: 1

main.go

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"fmt"
77
"io/ioutil"
88
"log"
9+
"sync"
10+
"time"
911

1012
"./api"
1113
"./task"
@@ -14,9 +16,10 @@ import (
1416
)
1517

1618
type taskConfig struct {
17-
TOKEN string `yaml:"token"`
18-
APIURL string `yaml:"api"`
19-
TASKNUM int `yaml:"taskNum"`
19+
TOKEN string `yaml:"token"`
20+
APIURL string `yaml:"api"`
21+
TASKNUM int `yaml:"taskNum"`
22+
DURATION int `yaml:"Duration"`
2023
}
2124

2225
func (c *taskConfig) getConf() (*taskConfig, error) {
@@ -42,23 +45,48 @@ func main() {
4245
var config taskConfig
4346
_, err := config.getConf()
4447
if err == nil {
48+
4549
log.Printf("[INFO] Config information: %v ", config)
46-
api := api.ApiInfo{TOKEN: config.TOKEN, APIURL: config.APIURL}
50+
api := api.ApiInfo{
51+
TOKEN: config.TOKEN,
52+
APIURL: config.APIURL,
53+
Lock: new(sync.Mutex),
54+
}
4755
basicInfo := api.GetBasicInfo()
56+
4857
if basicInfo != "" {
58+
4959
log.Printf("[INFO] Basic Info: %v ", basicInfo)
5060
var siteInfo map[string]string
5161
err := json.Unmarshal([]byte(basicInfo), &siteInfo)
5262
if err != nil {
5363
log.Printf("[ERROR] Failed to decode basic infomation, %v ", err.Error())
5464
}
55-
for {
56-
taskListContent := api.GetTaskList(config.TASKNUM)
57-
if taskListContent != "none" {
58-
task.Init(taskListContent, api, siteInfo)
59-
}
60-
break
65+
66+
var wg sync.WaitGroup
67+
for i := 0; i < config.TASKNUM; i++ {
68+
wg.Add(1)
69+
log.Printf("[Info] Thread %d start", i+1)
70+
threadID := i
71+
go func() {
72+
for {
73+
74+
api.Lock.Lock()
75+
taskListContent := api.GetTaskList(1)
76+
api.Lock.Unlock()
77+
78+
if taskListContent != "none" {
79+
task.Init(taskListContent, api, siteInfo, threadID)
80+
}
81+
time.Sleep(time.Duration(config.DURATION) * time.Second)
82+
}
83+
84+
}()
85+
time.Sleep(time.Duration(1) * time.Second)
6186
}
87+
88+
wg.Wait()
89+
6290
}
6391

6492
}

task/TaskAdapter.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,7 @@ func (task *OneDriveUpload) uploadChunks(Client *onedrive.Client) {
107107
uploaded += (v.To - v.From + 1)
108108
task.Log(fmt.Sprintf("[Info] Chunk uploaded, From:%d To:%d Total:%d Complete:%.2f", v.From, v.To, task.Attr.Fsize, float32(uploaded)/float32(task.Attr.Fsize)))
109109
} else {
110-
//to-do:关闭上传Session
111-
110+
task.Info.apiInfo.CancelUploadSession(url)
112111
return
113112
}
114113
}
@@ -137,13 +136,12 @@ func (task *OneDriveUpload) uploadSingleChunk(chunk Chunk, Client *onedrive.Clie
137136
return false
138137
}
139138

140-
res, uploadErr := Client.UploadChunk(url, chunk.From, chunk.To, int(task.Attr.Fsize), r)
139+
_, uploadErr := Client.UploadChunk(url, chunk.From, chunk.To, int(task.Attr.Fsize), r)
141140
if uploadErr != "" {
142141
task.Log("[Error] Failed to upload chunk," + uploadErr)
143142
task.Error()
144143
return false
145144
}
146-
fmt.Println(res)
147145
r.Close()
148146
return true
149147
}

task/TaskPool.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package task
33
import (
44
"encoding/json"
55
"log"
6-
"sync"
76

87
"../api"
98
)
@@ -19,19 +18,16 @@ type SingleTaskInfo struct {
1918
}
2019

2120
//Init 初始化任务线程池
22-
func Init(taskListContent string, apiInfo api.ApiInfo, siteInfo map[string]string) {
21+
func Init(taskListContent string, apiInfo api.ApiInfo, siteInfo map[string]string, threadID int) {
2322
var taskStringList []SingleTaskInfo
2423
err := json.Unmarshal([]byte(taskListContent), &taskStringList)
2524
if err != nil {
2625
log.Printf("[ERROR] Failed to decode basic infomation, %v ", err.Error())
2726
}
28-
var wg sync.WaitGroup
2927
for _, v := range taskStringList {
30-
wg.Add(1)
28+
log.Printf("[Info][Thread %d] New task: %s", threadID, v.TaskName)
3129
signleTask := taskInfo{sqlInfo: v, apiInfo: apiInfo, siteInfo: siteInfo}
32-
go func(t taskInfo) {
33-
t.Run(&wg)
34-
}(signleTask)
30+
signleTask.Run()
31+
3532
}
36-
wg.Wait()
3733
}

task/task.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package task
22

33
import (
4-
"sync"
5-
64
"../api"
75
)
86

@@ -41,7 +39,7 @@ type Task interface {
4139
}
4240

4341
//Run 根据任务类型处理任务
44-
func (task *taskInfo) Run(wg *sync.WaitGroup) {
42+
func (task *taskInfo) Run() {
4543
var newTask Task
4644
switch {
4745
case task.sqlInfo.TaskType == "uploadSingleToOnedrive":
@@ -62,5 +60,4 @@ func (task *taskInfo) Run(wg *sync.WaitGroup) {
6260
newTask.Init()
6361
}
6462
newTask.Excute()
65-
wg.Done()
6663
}

0 commit comments

Comments
 (0)