Skip to content

Commit 145c821

Browse files
committed
feat: support deadletter apis
1 parent 75ebe00 commit 145c821

File tree

5 files changed

+236
-21
lines changed

5 files changed

+236
-21
lines changed

src/main/java/com/meitu/platform/lmstfy/client/LmstfyClient.java

Lines changed: 83 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@
22

33
import com.meitu.platform.lmstfy.exception.LmstfyException;
44
import com.meitu.platform.lmstfy.exception.LmstfyIllegalRequestException;
5+
import com.meitu.platform.lmstfy.exception.LmstfyNotJobException;
56
import com.meitu.platform.lmstfy.exception.LmstfyUnexpectedException;
6-
import com.meitu.platform.lmstfy.response.ErrorResponse;
7+
import com.meitu.platform.lmstfy.response.*;
78
import com.meitu.platform.lmstfy.Job;
8-
import com.meitu.platform.lmstfy.response.LmstfyResponse;
9-
import com.meitu.platform.lmstfy.response.PublishResponse;
10-
import com.meitu.platform.lmstfy.response.QueueSizeResponse;
119
import okhttp3.*;
1210

1311
import java.io.IOException;
@@ -31,6 +29,7 @@ public class LmstfyClient {
3129
private static final String QUERY_TRIES = "tries";
3230
private static final String QUERY_TIMEOUT = "timeout";
3331
private static final String QUERY_TTR = "ttr";
32+
private static final String QUERY_LIMIT = "limit";
3433

3534
private String namespace;
3635
private String token;
@@ -39,7 +38,9 @@ public class LmstfyClient {
3938
private HttpUrl serviceAddress;
4039

4140

42-
public LmstfyClient(String host, int port, String namespace, String token) {
41+
public LmstfyClient(String host, int port, String namespace, String token,
42+
int readTimeoutSecond, int writeTimeoutSecond, int connectTimeoutSecond,
43+
int retryTimes, int retryIntervalMilliseconds) {
4344
this.namespace = namespace;
4445
this.token = token;
4546
this.serviceAddress = new HttpUrl.Builder()
@@ -50,14 +51,18 @@ public LmstfyClient(String host, int port, String namespace, String token) {
5051

5152
this.http = new OkHttpClient.Builder()
5253
.retryOnConnectionFailure(true)
53-
.addInterceptor(new OkHttpRetryInterceptor(3, 100))
54-
.readTimeout(600, TimeUnit.SECONDS)
55-
.writeTimeout(600, TimeUnit.SECONDS)
56-
.connectTimeout(5, TimeUnit.SECONDS)
54+
.addInterceptor(new OkHttpRetryInterceptor(retryTimes, retryIntervalMilliseconds))
55+
.readTimeout(readTimeoutSecond, TimeUnit.SECONDS)
56+
.writeTimeout(writeTimeoutSecond, TimeUnit.SECONDS)
57+
.connectTimeout(connectTimeoutSecond, TimeUnit.SECONDS)
5758
.connectionPool(new ConnectionPool(100, 5, TimeUnit.MINUTES))
5859
.build();
5960
}
6061

62+
public LmstfyClient(String host, int port, String namespace, String token) {
63+
this(host, port, namespace, token, 600, 600, 5, 3, 100);
64+
}
65+
6166
private LmstfyResponse doRequest(String method, HttpUrl url, RequestBody body) throws LmstfyException {
6267
Request request = new Request.Builder()
6368
.url(url)
@@ -114,7 +119,7 @@ public String publish(String queue, byte[] data, int ttlSecond, short tries, int
114119
* a job; if it's positive, this method would polling for new job until timeout.
115120
* @param queues You can consume multiple queues of the same namespace at once. The order of the queues in
116121
* the params implies the priority.
117-
* @return Return the job, if there is no job available, it will return null.
122+
* @return Return the job.
118123
* @throws LmstfyException
119124
*/
120125
public Job consume(int ttrSecond, int timeoutSecond, String... queues) throws LmstfyException {
@@ -126,14 +131,12 @@ public Job consume(int ttrSecond, int timeoutSecond, String... queues) throws Lm
126131
LmstfyResponse response = this.doRequest("GET", url, null);
127132
switch (response.getCode()) {
128133
case HTTP_OK:
129-
Job job = response.unmarshalBody(Job.class);
130-
job.setData(new String(Base64.getDecoder().decode(job.getBase64Data())));
131-
return job;
134+
return response.unmarshalToJob();
132135
case HTTP_BAD_REQUEST:
133136
ErrorResponse errorResponse = response.unmarshalBody(ErrorResponse.class);
134137
throw new LmstfyIllegalRequestException(response.getCode(), errorResponse.getError());
135138
case HTTP_NOT_FOUND:
136-
return null;
139+
throw new LmstfyNotJobException();
137140
default:
138141
throw new LmstfyUnexpectedException(response.getCode());
139142
}
@@ -223,20 +226,83 @@ public Job peekJob(String queue, String jobID) throws LmstfyException {
223226
return this.peek(url);
224227
}
225228

229+
/**
230+
* Peek the deadletter of the queue
231+
*
232+
* @param queue
233+
* @return
234+
* @throws LmstfyException
235+
*/
236+
public DeadLetterResponse peekDeadLetter(String queue) throws LmstfyException {
237+
HttpUrl url = genServiceUrlBuilder(PATH_API, this.namespace, queue, "deadletter")
238+
.build();
239+
240+
LmstfyResponse response = doRequest("GET", url, null);
241+
switch (response.getCode()) {
242+
case HTTP_OK:
243+
DeadLetterResponse deadLetterResponse = response.unmarshalBody(DeadLetterResponse.class);
244+
return deadLetterResponse;
245+
default:
246+
throw new LmstfyUnexpectedException(response.getCode());
247+
}
248+
}
249+
226250
private Job peek(HttpUrl url) throws LmstfyException {
227251
LmstfyResponse response = doRequest("GET", url, null);
228252
switch (response.getCode()) {
229253
case HTTP_OK:
230-
Job job = response.unmarshalBody(Job.class);
231-
job.setData(new String(Base64.getDecoder().decode(job.getBase64Data())));
232-
return job;
254+
return response.unmarshalToJob();
233255
case HTTP_NOT_FOUND:
234256
return null;
235257
default:
236258
throw new LmstfyUnexpectedException(response.getCode());
237259
}
238260
}
239261

262+
/**
263+
* Respawn job(s) in the dead letter
264+
*
265+
* @param queue
266+
* @param limit The number (upper limit) of the jobs to be respawned.
267+
* @param ttlSecond Time-to-live of this job in seconds, 0 means forever.
268+
* @return The number of jobs that're respawned.
269+
* @throws LmstfyException
270+
*/
271+
public int respawnDeadLetter(String queue, int limit, int ttlSecond) throws LmstfyException {
272+
HttpUrl url = genServiceUrlBuilder(PATH_API, this.namespace, queue, "deadletter")
273+
.addQueryParameter(QUERY_LIMIT, String.valueOf(limit))
274+
.addQueryParameter(QUERY_TTL, String.valueOf(ttlSecond))
275+
.build();
276+
LmstfyResponse response = doRequest("PUT", url, null);
277+
switch (response.getCode()) {
278+
case HTTP_OK:
279+
RespawnResponse respawnResponse = response.unmarshalBody(RespawnResponse.class);
280+
return respawnResponse.getCount();
281+
default:
282+
throw new LmstfyUnexpectedException(response.getCode());
283+
}
284+
}
285+
286+
/**
287+
* Delete job(s) in the dead letter
288+
*
289+
* @param queue
290+
* @param limit The number (upper limit) of the jobs to be deleted
291+
* @throws LmstfyException
292+
*/
293+
public void deleteDeadLetter(String queue, int limit) throws LmstfyException {
294+
HttpUrl url = genServiceUrlBuilder(PATH_API, this.namespace, queue, "deadletter")
295+
.addQueryParameter(QUERY_LIMIT, String.valueOf(limit))
296+
.build();
297+
LmstfyResponse response = doRequest("DELETE", url, null);
298+
switch (response.getCode()) {
299+
case HTTP_NO_CONTENT:
300+
return;
301+
default:
302+
throw new LmstfyUnexpectedException(response.getCode());
303+
}
304+
}
305+
240306
private HttpUrl.Builder genServiceUrlBuilder(String... pathSegments) {
241307
HttpUrl.Builder builder = serviceAddress.newBuilder();
242308
for (String pathSegment : pathSegments) {

src/main/java/com/meitu/platform/lmstfy/example.java

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,30 @@ public class example {
1616
private static String host = "localhost";
1717
private static int port = 7777;
1818
private static String namespace = "sdk-test";
19+
private static String queue = "sdk-test-queue";
20+
private static LmstfyClient client = new LmstfyClient(host, port, namespace, token);
1921

2022
public static void main(String[] args) {
2123
publish();
2224
consume();
25+
deleteAndAck();
26+
queueSize();
27+
peekQueue();
28+
peekJob();
2329
}
2430

2531
private static void publish() {
26-
LmstfyClient client = new LmstfyClient(host, port, namespace, token);
2732
try {
28-
String jobID = client.publish("sdk-test-queue", "test data".getBytes(), 5, (short) 2, 0);
33+
String jobID = client.publish(queue, "test data".getBytes(), 5, (short) 2, 0);
2934
System.out.println(jobID);
3035
} catch (LmstfyException e) {
3136
e.printStackTrace();
3237
}
3338
}
3439

3540
private static void consume() {
36-
LmstfyClient client = new LmstfyClient(host, port, namespace, token);
3741
try {
38-
Job job = client.consume(3, 2, "sdk-test-queue");
42+
Job job = client.consume(3, 2, queue);
3943
System.out.println(job.getData());
4044
} catch (LmstfyNotJobException e) {
4145
System.out.println("There has no available job");
@@ -44,4 +48,59 @@ private static void consume() {
4448
}
4549
}
4650

51+
private static void deleteAndAck() {
52+
try {
53+
String jobID1 = client.publish(queue, "test data".getBytes(), 5, (short) 2, 0);
54+
client.delete(queue, jobID1);
55+
String jobID2 = client.publish(queue, "test data".getBytes(), 5, (short) 2, 0);
56+
client.ack(queue, jobID2);
57+
} catch (LmstfyException e) {
58+
System.out.println("Request Lmstfy error, " + e.getMessage());
59+
}
60+
}
61+
62+
private static void queueSize() {
63+
try {
64+
int size1 = client.queueSize(queue);
65+
client.publish(queue, "test data".getBytes(), 5, (short) 2, 0);
66+
int size2 = client.queueSize(queue);
67+
System.out.printf("%d %d\n", size1, size2);
68+
} catch (LmstfyException e) {
69+
System.out.println("Request Lmstfy error, " + e.getMessage());
70+
}
71+
}
72+
73+
private static void peekQueue() {
74+
try {
75+
Job job = client.peekQueue(queue);
76+
if (job == null) {
77+
System.out.println("There has no available job to peek");
78+
return;
79+
}
80+
if (job.getData() == null) {
81+
System.out.printf("The job %s is expired\n", job.getJobID());
82+
} else {
83+
System.out.println(job.getData());
84+
}
85+
86+
} catch (LmstfyException e) {
87+
System.out.println("Request Lmstfy error, " + e.getMessage());
88+
}
89+
}
90+
91+
private static void peekJob() {
92+
try {
93+
String jobID = client.publish(queue, "test data".getBytes(), 5, (short) 2, 0);
94+
Job job = client.peekJob(queue, jobID);
95+
if (job == null) {
96+
System.out.println("There has no available job");
97+
} else {
98+
System.out.printf("%s\n", job.getData());
99+
}
100+
} catch (LmstfyException e) {
101+
System.out.println("Request Lmstfy error, " + e.getMessage());
102+
}
103+
}
104+
105+
47106
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.meitu.platform.lmstfy.response;
2+
3+
import com.google.gson.annotations.SerializedName;
4+
5+
/**
6+
* Description:
7+
*
8+
* @author Yesphet
9+
* @date 2019-12-10
10+
*/
11+
public class DeadLetterResponse {
12+
13+
private String namespace;
14+
private String queue;
15+
@SerializedName("deadletter_size")
16+
private int deadLetterSize;
17+
@SerializedName("deadletter_head")
18+
private String deadLetterHead;
19+
20+
public String getNamespace() {
21+
return namespace;
22+
}
23+
24+
public void setNamespace(String namespace) {
25+
this.namespace = namespace;
26+
}
27+
28+
public String getQueue() {
29+
return queue;
30+
}
31+
32+
public void setQueue(String queue) {
33+
this.queue = queue;
34+
}
35+
36+
public int getDeadLetterSize() {
37+
return deadLetterSize;
38+
}
39+
40+
public void setDeadLetterSize(int deadLetterSize) {
41+
this.deadLetterSize = deadLetterSize;
42+
}
43+
44+
public String getDeadLetterHead() {
45+
return deadLetterHead;
46+
}
47+
48+
public void setDeadLetterHead(String deadLetterHead) {
49+
this.deadLetterHead = deadLetterHead;
50+
}
51+
}

src/main/java/com/meitu/platform/lmstfy/response/LmstfyResponse.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package com.meitu.platform.lmstfy.response;
22

33
import com.google.gson.Gson;
4+
import com.meitu.platform.lmstfy.Job;
45
import okhttp3.Response;
56

67
import java.io.IOException;
8+
import java.util.Base64;
79

810
/**
911
* Description:
@@ -46,4 +48,12 @@ public String getRequestID() {
4648
public <T> T unmarshalBody(Class<T> tClass) {
4749
return gson.fromJson(this.body, tClass);
4850
}
51+
52+
public Job unmarshalToJob() {
53+
Job job = gson.fromJson(this.body, Job.class);
54+
if (job.getBase64Data() != null) {
55+
job.setData(new String(Base64.getDecoder().decode(job.getBase64Data())));
56+
}
57+
return job;
58+
}
4959
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.meitu.platform.lmstfy.response;
2+
3+
/**
4+
* Description:
5+
*
6+
* @author Yesphet
7+
* @date 2019-12-10
8+
*/
9+
public class RespawnResponse {
10+
11+
private String msg;
12+
private int count;
13+
14+
public String getMsg() {
15+
return msg;
16+
}
17+
18+
public void setMsg(String msg) {
19+
this.msg = msg;
20+
}
21+
22+
public int getCount() {
23+
return count;
24+
}
25+
26+
public void setCount(int count) {
27+
this.count = count;
28+
}
29+
}

0 commit comments

Comments
 (0)