Skip to content

Commit 625234c

Browse files
committed
feat: handle large file uploading while the file chunk is already created
1 parent 1435bc6 commit 625234c

File tree

7 files changed

+188
-14
lines changed

7 files changed

+188
-14
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
*.dll
55
*.so
66
*.dylib
7+
debug
78

89
# Test binary, build with `go test -c`
910
*.test

conf.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
token: 52121225
22
api: http://127.0.0.1/Queue
3-
taskNum: 4
3+
taskNum: 4
4+
threadNum: 4

debug

17 KB
Binary file not shown.

main.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ func main() {
5454
}
5555
for {
5656
taskListContent := api.GetTaskList(config.TASKNUM)
57-
task.Init(taskListContent, api, siteInfo)
57+
if taskListContent != "none" {
58+
task.Init(taskListContent, api, siteInfo)
59+
}
5860
break
5961
}
6062
}

onedrive/client.go

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package onedrive
22

33
import (
4+
"bytes"
45
"encoding/json"
56
"io/ioutil"
67
"net/http"
@@ -40,6 +41,13 @@ type Response struct {
4041
ResString string
4142
}
4243

44+
type uploadSessionResponse struct {
45+
DataContxt string `json:"@odata.context"`
46+
ExpirationDateTime string `json:"expirationDateTime"`
47+
NextExpectedRanges []string `json:"nextExpectedRanges"`
48+
UploadURL string `json:"uploadUrl"`
49+
}
50+
4351
//Init 初始化客户端
4452
func (client *Client) Init() bool {
4553
client.HTTPClient = &http.Client{}
@@ -55,6 +63,47 @@ func (client *Client) PutFile(path string, file *os.File) (string, string) {
5563
return "", res.Error.Error.Message
5664
}
5765

66+
//CreateUploadSession 创建分片上传会话
67+
func (client *Client) CreateUploadSession(path string) (string, string) {
68+
res := client.apiPost(path, []byte(""))
69+
if res.Success {
70+
response := uploadSessionResponse{}
71+
err := json.Unmarshal([]byte(res.ResString), &response)
72+
if err != nil {
73+
return "", err.Error()
74+
}
75+
return response.UploadURL, ""
76+
}
77+
78+
return "", res.Error.Error.Message
79+
}
80+
81+
//apiPost 发送POST请求
82+
func (client *Client) apiPost(path string, jsonStr []byte) Response {
83+
if client.Tried > maxTry {
84+
return buildResponseResult("PUT failed, reached the maximum number of attempts.", 0)
85+
}
86+
87+
req, err := http.NewRequest("POST", APIURL+path, bytes.NewBuffer(jsonStr))
88+
if err != nil {
89+
return buildResponseResult(err.Error(), 0)
90+
}
91+
92+
req.Header.Set("Authorization", "Bearer "+client.AccessToken)
93+
req.Header.Set("Content-Type", "application/json")
94+
95+
res, err := client.HTTPClient.Do(req)
96+
if err != nil {
97+
client.Tried++
98+
return client.apiPost(path, jsonStr)
99+
}
100+
defer res.Body.Close()
101+
102+
r, _ := ioutil.ReadAll(res.Body)
103+
return client.praseResponse(string(r), res.StatusCode)
104+
105+
}
106+
58107
//apiPut 发送PUT请求
59108
func (client *Client) apiPut(path string, stream *os.File) Response {
60109
if client.Tried > maxTry {
@@ -80,7 +129,7 @@ func (client *Client) apiPut(path string, stream *os.File) Response {
80129
}
81130

82131
func (client *Client) praseResponse(res string, code int) Response {
83-
if code != 200 {
132+
if code != 200 && code != 202 {
84133
errorRes := ErrorResponse{}
85134
json.Unmarshal([]byte(res), &errorRes)
86135
return Response{

task/TaskAdapter.go

Lines changed: 124 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,36 @@ package task
22

33
import (
44
"encoding/json"
5+
"fmt"
56
"log"
67
"os"
78
"strconv"
9+
"sync"
810

911
"../onedrive"
1012
)
1113

14+
const uploadThreadNum int = 4
15+
16+
type chunkFile struct {
17+
ID int `json:"id"`
18+
User int `json:"user"`
19+
CTX string `json:"ctx"`
20+
Time string `json:"time"`
21+
ObjName string `json:"obj_name"`
22+
ChunkID int `json:"chunk_id"`
23+
Sum int `json:"sum"`
24+
}
25+
1226
type oneDriveUploadAttr struct {
13-
Fname string `json:"fname"`
14-
Path string `json:"path"`
15-
Objname string `json:"objname"`
16-
SavePath string `json:"savePath"`
17-
Fsize uint64 `json:"fsize"`
18-
PicInfo string `json:"picInfo"`
19-
PolicyID int `json:"policyId"`
27+
Fname string `json:"fname"`
28+
Path string `json:"path"`
29+
Objname string `json:"objname"`
30+
SavePath string `json:"savePath"`
31+
Fsize uint64 `json:"fsize"`
32+
PicInfo string `json:"picInfo"`
33+
PolicyID int `json:"policyId"`
34+
Chunks []chunkFile `json:"chunks"`
2035
}
2136

2237
//OneDriveUpload OneDrive上传类型Task
@@ -44,6 +59,14 @@ type OnedriveState struct {
4459
} `json:"token"`
4560
}
4661

62+
//Chunk 文件分片
63+
type Chunk struct {
64+
Type int //分片方式,0-已切割为文件片,1-无需切割
65+
From int
66+
To int
67+
ChunkPath string
68+
}
69+
4770
//Excute 执行Onedrive上传Task
4871
func (task *OneDriveUpload) Excute() {
4972
authState := OnedriveState{}
@@ -55,13 +78,103 @@ func (task *OneDriveUpload) Excute() {
5578
Tried: 0,
5679
}
5780
Client.Init()
58-
var filePath string
59-
if task.Type == "UploadRegularRemoteDownloadFileToOnedrive" {
6081

82+
if task.Type == "uploadSingleToOnedrive" {
83+
task.uploadRegularFile(&Client)
84+
} else if task.Type == "uploadChunksToOnedrive" {
85+
task.uploadChunks(&Client)
86+
}
87+
88+
}
89+
90+
func (task *OneDriveUpload) uploadChunks(Client *onedrive.Client) {
91+
//获取上传URL
92+
url, err := Client.CreateUploadSession("/me/drive/root:/" + task.Attr.SavePath + "/" + task.Attr.Objname + ":/createUploadSession")
93+
if err != "" {
94+
task.Log("[Error] Failed to create upload session," + err)
95+
task.Error()
96+
return
97+
}
98+
fmt.Println(url)
99+
100+
chunkList, chunkErr := task.buildChunks()
101+
if chunkErr != "" {
102+
task.Log("[Error] Failed to upload chunks," + chunkErr)
103+
task.Error()
104+
return
105+
}
106+
107+
var wg sync.WaitGroup
108+
ch := make(chan Chunk)
109+
isFailed := false
110+
111+
for index := 0; index < uploadThreadNum; index++ {
112+
wg.Add(1)
113+
go task.uploadSingleChunk(&wg, ch, &isFailed)
114+
}
115+
116+
for _, v := range chunkList {
117+
if isFailed {
118+
close(ch)
119+
break
120+
}
121+
ch <- v
122+
}
123+
close(ch)
124+
wg.Wait()
125+
126+
}
127+
128+
func (task *OneDriveUpload) uploadSingleChunk(wg *sync.WaitGroup, ch chan Chunk, isFailed *bool) {
129+
for {
130+
chunk, opened := <-ch
131+
if !opened {
132+
fmt.Println("quit")
133+
wg.Done()
134+
return
135+
}
136+
fmt.Println(chunk.From)
137+
}
138+
}
139+
140+
func (task *OneDriveUpload) buildChunks() ([]Chunk, string) {
141+
142+
var chunkType int
143+
if task.Type == "uploadChunksToOnedrive" {
144+
chunkType = 0
61145
} else {
62-
filePath = task.BasePath + "public/uploads/" + task.Attr.SavePath + "/" + task.Attr.Objname
146+
chunkType = 1
63147
}
64148

149+
var chunkList []Chunk
150+
var offset int
151+
152+
for _, v := range task.Attr.Chunks {
153+
154+
chunkPath := task.BasePath + "public/uploads/chunks/" + v.ObjName + ".chunk"
155+
156+
fileInfo, err := os.Stat(chunkPath)
157+
if os.IsNotExist(err) {
158+
return chunkList, "Chunk file " + chunkPath + " not exist"
159+
}
160+
chunkSize := int(fileInfo.Size())
161+
162+
chunkList = append(chunkList, Chunk{
163+
Type: chunkType,
164+
From: offset,
165+
To: offset + chunkSize - 1,
166+
ChunkPath: chunkPath,
167+
})
168+
offset += chunkSize
169+
}
170+
171+
return chunkList, ""
172+
173+
}
174+
175+
func (task *OneDriveUpload) uploadRegularFile(Client *onedrive.Client) {
176+
var filePath string
177+
filePath = task.BasePath + "public/uploads/" + task.Attr.SavePath + "/" + task.Attr.Objname
65178
r, err := os.Open(filePath)
66179
defer r.Close()
67180
if err != nil {
@@ -74,14 +187,14 @@ func (task *OneDriveUpload) Excute() {
74187
if errorMsg != "" {
75188
task.Log("[Error] Upload Failed," + errorMsg)
76189
task.Error()
190+
return
77191
}
78192

79193
addRes := task.Info.apiInfo.SetSuccess(task.Info.sqlInfo.ID)
80194

81195
if addRes != "" {
82196
task.Log("[Error] " + addRes)
83197
}
84-
85198
}
86199

87200
//Init 执行Onedrive上传Task

task/task.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ func (task *taskInfo) Run(wg *sync.WaitGroup) {
5252
BasePath: task.siteInfo["basePath"],
5353
}
5454
newTask.Init()
55+
case task.sqlInfo.TaskType == "uploadChunksToOnedrive":
56+
newTask = &OneDriveUpload{
57+
Info: task,
58+
Tried: 0,
59+
Type: task.sqlInfo.TaskType,
60+
BasePath: task.siteInfo["basePath"],
61+
}
62+
newTask.Init()
5563
}
5664
newTask.Excute()
5765
wg.Done()

0 commit comments

Comments
 (0)