Skip to content

Commit eb5c15e

Browse files
committed
add stream upload
1 parent 59868ee commit eb5c15e

File tree

9 files changed

+353
-4
lines changed

9 files changed

+353
-4
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
#Changelog
22

3+
## 7.2.2 (2016-11-04)
4+
### 增加
5+
* stream 方式上传
6+
37
## 7.2.1 (2016-11-03)
48
### 修正
59
* streaming publish url 过期时间单位问题

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def versionName() {
4242
return version
4343
}
4444

45-
def versionNameToCode(String version) {
45+
static def versionNameToCode(String version) {
4646
String v = version.replaceAll(/\./, '')
4747
return v.toLong()
4848
}

src/main/java/com/qiniu/common/Constants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ public final class Constants {
99
/**
1010
* 版本号
1111
*/
12-
public static final String VERSION = "7.2.1";
12+
public static final String VERSION = "7.2.2";
1313
/**
1414
* 块大小,不能改变
1515
*/

src/main/java/com/qiniu/common/Zone.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public static Zone zone2() {
2727
}
2828

2929
// 北美
30-
public static Zone zone_na0() {
30+
public static Zone zoneNa0() {
3131
return new FixedZone("http://up-na0.qiniu.com", "http://upload-na0.qiniu.com",
3232
"", "http://rs-na0.qbox.me", "http://rsf-na0.qbox.me", "http://iovip-na0.qbox.me",
3333
"https://up-na0.qbox.me", "http://api-na0.qiniu.com");
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package com.qiniu.storage;
2+
3+
import com.qiniu.common.Constants;
4+
import com.qiniu.common.QiniuException;
5+
import com.qiniu.http.Client;
6+
import com.qiniu.http.Response;
7+
import com.qiniu.storage.model.ResumeBlockInfo;
8+
import com.qiniu.util.StringMap;
9+
import com.qiniu.util.StringUtils;
10+
import com.qiniu.util.UrlSafeBase64;
11+
12+
import java.io.IOException;
13+
import java.io.InputStream;
14+
import java.util.ArrayList;
15+
16+
/**
17+
* Created by long on 2016/11/4.
18+
*/
19+
public final class StreamUploader {
20+
private final String upToken;
21+
private final String key;
22+
private final StringMap params;
23+
private final String mime;
24+
private final ArrayList<String> contexts;
25+
private final Configuration configuration;
26+
private final Client client;
27+
private final byte[] blockBuffer;
28+
private final InputStream stream;
29+
private long size;
30+
private String host;
31+
32+
StreamUploader(Client client, String upToken, String key, InputStream stream,
33+
StringMap params, String mime, Configuration configuration) {
34+
this.configuration = configuration;
35+
this.client = client;
36+
this.upToken = upToken;
37+
this.key = key;
38+
this.params = params;
39+
this.mime = mime == null ? Client.DefaultMime : mime;
40+
this.contexts = new ArrayList<>();
41+
this.blockBuffer = new byte[Constants.BLOCK_SIZE];
42+
this.stream = stream;
43+
}
44+
45+
public Response upload() throws QiniuException {
46+
if (host == null) {
47+
this.host = configuration.zone.upHost(upToken);
48+
}
49+
50+
long uploaded = 0;
51+
int ret = 0;
52+
boolean retry = false;
53+
int contextIndex = 0;
54+
55+
while (size == 0) {
56+
int bufferIndex = 0;
57+
while (ret != -1 && bufferIndex != blockBuffer.length) {
58+
try {
59+
ret = stream.read(blockBuffer, bufferIndex, blockBuffer.length - bufferIndex);
60+
} catch (IOException e) {
61+
close();
62+
throw new QiniuException(e);
63+
}
64+
if (ret != -1) {
65+
bufferIndex += ret;
66+
if (ret == 0) {
67+
try {
68+
Thread.sleep(100);
69+
} catch (InterruptedException e) {
70+
e.printStackTrace();
71+
}
72+
}
73+
} else {
74+
size = uploaded + bufferIndex;
75+
}
76+
}
77+
78+
Response response = null;
79+
try {
80+
response = makeBlock(blockBuffer, bufferIndex);
81+
} catch (QiniuException e) {
82+
if (e.code() < 0) {
83+
host = configuration.zone.upHostBackup(upToken);
84+
}
85+
if (e.response == null || e.response.needRetry()) {
86+
retry = true;
87+
} else {
88+
close();
89+
throw e;
90+
}
91+
}
92+
if (retry) {
93+
try {
94+
response = makeBlock(blockBuffer, bufferIndex);
95+
retry = false;
96+
} catch (QiniuException e) {
97+
close();
98+
throw e;
99+
}
100+
101+
}
102+
ResumeBlockInfo blockInfo = response.jsonToObject(ResumeBlockInfo.class);
103+
//TODO check return crc32
104+
// if blockInfo.crc32 != crc{}
105+
contexts.add(blockInfo.ctx);
106+
uploaded += bufferIndex;
107+
}
108+
close();
109+
110+
try {
111+
return makeFile();
112+
} catch (QiniuException e) {
113+
try {
114+
return makeFile();
115+
} catch (QiniuException e1) {
116+
throw e1;
117+
}
118+
}
119+
}
120+
121+
private Response makeBlock(byte[] block, int blockSize) throws QiniuException {
122+
String url = host + "/mkblk/" + blockSize;
123+
return post(url, block, 0, blockSize);
124+
}
125+
126+
private void close() {
127+
try {
128+
stream.close();
129+
} catch (IOException e) {
130+
e.printStackTrace();
131+
}
132+
}
133+
134+
private String fileUrl() {
135+
String url = host + "/mkfile/" + size + "/mimeType/"
136+
+ UrlSafeBase64.encodeToString(mime);
137+
final StringBuilder b = new StringBuilder(url);
138+
if (key != null) {
139+
b.append("/key/");
140+
b.append(UrlSafeBase64.encodeToString(key));
141+
}
142+
if (params != null) {
143+
params.forEach(new StringMap.Consumer() {
144+
@Override
145+
public void accept(String key, Object value) {
146+
b.append("/");
147+
b.append(key);
148+
b.append("/");
149+
b.append(UrlSafeBase64.encodeToString("" + value));
150+
}
151+
});
152+
}
153+
return b.toString();
154+
}
155+
156+
private Response makeFile() throws QiniuException {
157+
String url = fileUrl();
158+
String s = StringUtils.join(contexts, ",");
159+
return post(url, StringUtils.utf8Bytes(s));
160+
}
161+
162+
private Response post(String url, byte[] data) throws QiniuException {
163+
return client.post(url, data, new StringMap().put("Authorization", "UpToken " + upToken));
164+
}
165+
166+
private Response post(String url, byte[] data, int offset, int size) throws QiniuException {
167+
return client.post(url, data, offset, size, new StringMap().put("Authorization", "UpToken " + upToken),
168+
Client.DefaultMime);
169+
}
170+
}

src/main/java/com/qiniu/storage/UploadManager.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import java.io.File;
99
import java.io.IOException;
10+
import java.io.InputStream;
1011

1112
/**
1213
* 七牛文件上传管理器
@@ -173,7 +174,17 @@ public Response put(File file, String key, String token, StringMap params,
173174
return uploader.upload();
174175
}
175176

176-
177+
/**
178+
* 异步上传数据
179+
*
180+
* @param data 上传的数据
181+
* @param key 上传数据保存的文件名
182+
* @param token 上传凭证
183+
* @param params 自定义参数,如 params.put("x:foo", "foo")
184+
* @param mime 指定文件mimetype
185+
* @param checkCrc 是否验证crc32
186+
* @param handler 上传完成的回调函数
187+
*/
177188
public void asyncPut(final byte[] data, final String key, final String token, StringMap params,
178189
String mime, boolean checkCrc, UpCompletionHandler handler) throws IOException {
179190
checkArgs(key, data, null, token);
@@ -183,4 +194,29 @@ public void asyncPut(final byte[] data, final String key, final String token, St
183194
params = filterParam(params);
184195
new FormUploader(client, token, key, data, params, mime, checkCrc, configuration).asyncUpload(handler);
185196
}
197+
198+
/**
199+
* 流式上传,通常情况建议文件上传,可以使用持久化的断点记录。
200+
*
201+
* @param stream sha
202+
* @param key 上传文件保存的文件名
203+
* @param token 上传凭证
204+
* @param params 自定义参数,如 params.put("x:foo", "foo")
205+
* @param mime 指定文件mimetype
206+
*/
207+
public Response put(InputStream stream, String key, String token, StringMap params,
208+
String mime) throws QiniuException {
209+
String message = null;
210+
if (stream == null) {
211+
message = "no input data";
212+
} else if (token == null || token.equals("")) {
213+
message = "no token";
214+
}
215+
if (message != null) {
216+
throw new IllegalArgumentException(message);
217+
}
218+
StreamUploader uploader = new StreamUploader(client, token, key, stream,
219+
params, mime, configuration);
220+
return uploader.upload();
221+
}
186222
}

src/test/java/com/qiniu/storage/BucketTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,5 +322,15 @@ public void testBatch() {
322322
e.printStackTrace();
323323
fail();
324324
}
325+
326+
BucketManager.Batch opsDel = new BucketManager.Batch().delete(TestConfig.bucket,
327+
key, key1, key2, key3, key4);
328+
329+
try {
330+
bucketManager.batch(opsDel);
331+
} catch (QiniuException e) {
332+
e.printStackTrace();
333+
fail();
334+
}
325335
}
326336
}

src/test/java/com/qiniu/storage/ResumeUploadTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.qiniu.common.Zone;
77
import com.qiniu.http.Client;
88
import com.qiniu.http.Response;
9+
import com.qiniu.util.Etag;
910
import com.qiniu.util.StringMap;
1011
import org.junit.Test;
1112

@@ -52,6 +53,7 @@ private void template(int size, boolean https) throws IOException {
5253
UploadManager uploadManager = new UploadManager(c);
5354
final String expectKey = "\r\n?&r=" + size + "k";
5455
final File f = TempFile.createFile(size);
56+
final String etag = Etag.file(f);
5557
final String returnBody = "{\"key\":\"$(key)\",\"hash\":\"$(etag)\",\"fsize\":\"$(fsize)\""
5658
+ ",\"fname\":\"$(fname)\",\"mimeType\":\"$(mimeType)\"}";
5759
String token = TestConfig.testAuth.uploadToken(TestConfig.bucket, expectKey, 3600,
@@ -64,6 +66,7 @@ private void template(int size, boolean https) throws IOException {
6466
MyRet ret = r.jsonToObject(MyRet.class);
6567
assertEquals(expectKey, ret.key);
6668
assertEquals(f.getName(), ret.fname);
69+
assertEquals(etag, ret.hash);
6770
} catch (QiniuException e) {
6871
assertEquals("", e.response.bodyString());
6972
fail();

0 commit comments

Comments
 (0)