Skip to content

Commit 2fc7c6a

Browse files
authored
Merge pull request #1433 from nats-io/prioritized
(2.12) Prioritized Consumer Support
2 parents 04541cc + 96d2e9f commit 2fc7c6a

27 files changed

+599
-7
lines changed

src/main/java/io/nats/client/BaseConsumeOptions.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class BaseConsumeOptions implements JsonSerializable {
4545
protected final int thresholdPercent;
4646
protected final long idleHeartbeat;
4747
protected final String group;
48+
protected final int priority;
4849
protected final long minPending;
4950
protected final long minAckPending;
5051
protected final boolean raiseStatusWarnings;
@@ -67,6 +68,7 @@ protected BaseConsumeOptions(Builder<?, ?> b) {
6768
idleHeartbeat = Math.min(MAX_HEARTBEAT_MILLIS, expiresIn * MAX_IDLE_HEARTBEAT_PERCENT / 100);
6869

6970
this.group = b.group;
71+
this.priority = b.priority;
7072
this.minPending = b.minPending;
7173
this.minAckPending = b.minAckPending;
7274
raiseStatusWarnings = b.raiseStatusWarnings;
@@ -82,6 +84,7 @@ public String toJson() {
8284
addField(sb, IDLE_HEARTBEAT, idleHeartbeat);
8385
addField(sb, THRESHOLD_PERCENT, thresholdPercent);
8486
addField(sb, GROUP, group);
87+
addField(sb, PRIORITY, priority);
8588
addField(sb, MIN_PENDING, minPending);
8689
addField(sb, MIN_ACK_PENDING, minAckPending);
8790
addFldWhenTrue(sb, RAISE_STATUS_WARNINGS, raiseStatusWarnings);
@@ -111,6 +114,10 @@ public String getGroup() {
111114
return group;
112115
}
113116

117+
public int getPriority() {
118+
return priority;
119+
}
120+
114121
public long getMinPending() {
115122
return minPending;
116123
}
@@ -126,6 +133,7 @@ protected static abstract class Builder<B, CO> {
126133
protected long expiresIn = DEFAULT_EXPIRES_IN_MILLIS;
127134
protected boolean raiseStatusWarnings = false;
128135
protected String group;
136+
protected int priority;
129137
protected long minPending = -1;
130138
protected long minAckPending = -1;
131139

@@ -153,6 +161,7 @@ public B jsonValue(JsonValue jsonValue) {
153161
thresholdPercent(readInteger(jsonValue, THRESHOLD_PERCENT, -1));
154162
raiseStatusWarnings(readBoolean(jsonValue, RAISE_STATUS_WARNINGS, false));
155163
group(readStringEmptyAsNull(jsonValue, GROUP));
164+
priority(readInteger(jsonValue, PRIORITY, 0));
156165
minPending(readLong(jsonValue, MIN_PENDING, -1));
157166
minAckPending(readLong(jsonValue, MIN_ACK_PENDING, -1));
158167
return getThis();
@@ -239,6 +248,16 @@ public B group(String group) {
239248
return getThis();
240249
}
241250

251+
/**
252+
* Sets the priority for the group
253+
* @param priority the priority
254+
* @return Builder
255+
*/
256+
public B priority(int priority) {
257+
this.priority = priority;
258+
return getThis();
259+
}
260+
242261
/**
243262
* When specified, the consumer will only receive messages when the consumer has at least this many pending messages.
244263
* @param minPending the min pending

src/main/java/io/nats/client/BaseConsumerContext.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,4 +182,15 @@ public interface BaseConsumerContext {
182182
*/
183183
@NonNull
184184
MessageConsumer consume(@NonNull ConsumeOptions consumeOptions, @Nullable Dispatcher dispatcher, @NonNull MessageHandler handler) throws IOException, JetStreamApiException;
185+
186+
// TODO - PINNED CONSUMER SUPPORT
187+
// /**
188+
// * Unpins this consumer
189+
// * @param group the group name of the consumer's group
190+
// * @throws IOException covers various communication issues with the NATS
191+
// * server such as timeout or interruption
192+
// * @throws JetStreamApiException the request had an error related to the data
193+
// * @return true if the delete succeeded
194+
// */
195+
// boolean unpin(String group) throws IOException, JetStreamApiException;
185196
}

src/main/java/io/nats/client/JetStreamManagement.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,19 @@ public interface JetStreamManagement {
357357
*/
358358
boolean deleteMessage(String streamName, long seq, boolean erase) throws IOException, JetStreamApiException;
359359

360+
// TODO - PINNED CONSUMER SUPPORT
361+
// /**
362+
// * Unpins a consumer
363+
// * @param streamName name of the stream
364+
// * @param consumerName name of consumer
365+
// * @param consumerGroup name of the consumer's group
366+
// * @throws IOException covers various communication issues with the NATS
367+
// * server such as timeout or interruption
368+
// * @throws JetStreamApiException the request had an error related to the data
369+
// * @return true if the delete succeeded
370+
// */
371+
// boolean unpinConsumer(String streamName, String consumerName, String consumerGroup) throws IOException, JetStreamApiException;
372+
360373
/**
361374
* Gets a context for publishing and subscribing to subjects backed by Jetstream streams
362375
* and consumers, using the same connection and JetStreamOptions as the management.

src/main/java/io/nats/client/PullRequestOptions.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class PullRequestOptions implements JsonSerializable {
3333
private final Duration expiresIn;
3434
private final Duration idleHeartbeat;
3535
private final String group;
36+
private final int priority;
3637
private final long minPending;
3738
private final long minAckPending;
3839

@@ -43,6 +44,7 @@ public PullRequestOptions(Builder b) {
4344
this.expiresIn = b.expiresIn;
4445
this.idleHeartbeat = b.idleHeartbeat;
4546
this.group = b.group;
47+
this.priority = b.priority;
4648
this.minPending = b.minPending < 0 ? -1 : b.minPending;
4749
this.minAckPending = b.minAckPending < 0 ? -1 : b.minAckPending;
4850
}
@@ -56,13 +58,20 @@ public String toJson() {
5658
JsonUtils.addFldWhenTrue(sb, NO_WAIT, noWait);
5759
JsonUtils.addFieldAsNanos(sb, EXPIRES, expiresIn);
5860
JsonUtils.addFieldAsNanos(sb, IDLE_HEARTBEAT, idleHeartbeat);
59-
6061
JsonUtils.addField(sb, GROUP, group);
62+
JsonUtils.addFieldWhenGtZero(sb, PRIORITY, priority);
63+
// TODO - PINNED CONSUMER SUPPORT
64+
// JsonUtils.addField(sb, ID, getPinId());
6165
JsonUtils.addField(sb, MIN_PENDING, minPending);
6266
JsonUtils.addField(sb, MIN_ACK_PENDING, minAckPending);
6367
return JsonUtils.endJson(sb).toString();
6468
}
6569

70+
// TODO - PINNED CONSUMER SUPPORT
71+
// protected String getPinId() {
72+
// return null;
73+
// }
74+
6675
/**
6776
* Get the batch size option value
6877
* @return the batch size
@@ -107,6 +116,8 @@ public String getGroup() {
107116
return group;
108117
}
109118

119+
public int getPriority() { return priority; }
120+
110121
public long getMinPending() {
111122
return minPending;
112123
}
@@ -140,6 +151,7 @@ public static class Builder {
140151
private Duration expiresIn;
141152
private Duration idleHeartbeat;
142153
private String group;
154+
private int priority;
143155
private long minPending = -1;
144156
private long minAckPending = -1;
145157

@@ -233,6 +245,16 @@ public Builder group(String group) {
233245
return this;
234246
}
235247

248+
/**
249+
* Sets the priority within the group. Priority must be between 0 and 9 inclusive.
250+
* @param priority the priority
251+
* @return Builder
252+
*/
253+
public Builder priority(int priority) {
254+
this.priority = priority;
255+
return this;
256+
}
257+
236258
/**
237259
* When specified, the pull request will only receive messages when the consumer has at least this many pending messages.
238260
* @param minPending the min pending
@@ -261,6 +283,9 @@ public Builder minAckPending(long minAckPending) {
261283
*/
262284
public PullRequestOptions build() {
263285
validateGtZero(batchSize, "Pull batch size");
286+
if (priority < 0 || priority > 9) {
287+
throw new IllegalArgumentException("Priority must be between 0 and 9 inclusive.");
288+
}
264289
if (idleHeartbeat != null) {
265290
long idleNanosTemp = idleHeartbeat.toNanos() * 2;
266291
if (idleNanosTemp > 0) {

src/main/java/io/nats/client/api/ConsumerConfiguration.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ public class ConsumerConfiguration implements JsonSerializable {
9191
protected final List<String> filterSubjects;
9292
protected final List<String> priorityGroups;
9393
protected final PriorityPolicy priorityPolicy;
94+
protected final Duration priorityTimeout;
9495

9596
protected ConsumerConfiguration(ConsumerConfiguration cc) {
9697
this.deliverPolicy = cc.deliverPolicy;
@@ -124,6 +125,7 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {
124125
this.filterSubjects = cc.filterSubjects == null ? null : new ArrayList<>(cc.filterSubjects);
125126
this.priorityGroups = cc.priorityGroups == null ? null : new ArrayList<>(cc.priorityGroups);
126127
this.priorityPolicy = cc.priorityPolicy;
128+
this.priorityTimeout = cc.priorityTimeout;
127129
}
128130

129131
// For the builder
@@ -165,6 +167,7 @@ protected ConsumerConfiguration(Builder b)
165167

166168
this.priorityGroups = b.priorityGroups;
167169
this.priorityPolicy = b.priorityPolicy;
170+
this.priorityTimeout = b.priorityTimeout;
168171
}
169172

170173
/**
@@ -215,6 +218,8 @@ else if (filterSubjects.size() == 1) {
215218
if (priorityPolicy != null && priorityPolicy != DEFAULT_PRIORITY_POLICY) {
216219
JsonUtils.addField(sb, PRIORITY_POLICY, priorityPolicy.toString());
217220
}
221+
JsonUtils.addFieldAsNanos(sb, PRIORITY_TIMEOUT, priorityTimeout);
222+
218223
return endJson(sb).toString();
219224
}
220225

@@ -497,14 +502,23 @@ public ZonedDateTime getPauseUntil() {
497502
}
498503

499504
/**
500-
* Gets the priority policy of this consumer configuration.
505+
* Gets the priority policy of this consumer configuration. Defaults to PriorityPolicy.None
501506
* @return the priority policy.
502507
*/
503508
@NonNull
504509
public PriorityPolicy getPriorityPolicy() {
505510
return GetOrDefault(priorityPolicy);
506511
}
507512

513+
/**
514+
* For pinned_client priority policy how long before the client times out
515+
* @return the duration
516+
*/
517+
@Nullable
518+
public Duration getPriorityTimeout() {
519+
return priorityTimeout;
520+
}
521+
508522
/**
509523
* Gets whether deliver policy of this consumer configuration was set or left unset
510524
* @return true if the policy was set, false if the policy was not set
@@ -641,6 +655,14 @@ public boolean priorityPolicyWasSet() {
641655
return priorityPolicy != null;
642656
}
643657

658+
/**
659+
* Gets whether priority timeout for this consumer configuration was set or left unset
660+
* @return true if the timeout was set, false if the timeout was not set
661+
*/
662+
public boolean priorityTimeoutWasSet() {
663+
return priorityTimeout != null;
664+
}
665+
644666
/**
645667
* Creates a builder for the options.
646668
* @return a publish options builder
@@ -703,6 +725,7 @@ public static class Builder {
703725

704726
private List<String> priorityGroups;
705727
private PriorityPolicy priorityPolicy;
728+
private Duration priorityTimeout;
706729

707730
/**
708731
* Construct the builder
@@ -760,6 +783,7 @@ public Builder(ConsumerConfiguration cc) {
760783
this.priorityGroups = new ArrayList<>(cc.priorityGroups);
761784
}
762785
this.priorityPolicy = cc.priorityPolicy;
786+
this.priorityTimeout = cc.priorityTimeout;
763787
}
764788
}
765789

@@ -843,6 +867,7 @@ public Builder jsonValue(JsonValue jsonValue) {
843867

844868
priorityGroups(readOptionalStringList(jsonValue, PRIORITY_GROUPS));
845869
priorityPolicy(PriorityPolicy.get(readString(jsonValue, PRIORITY_POLICY)));
870+
priorityTimeout(readNanos(jsonValue, PRIORITY_TIMEOUT));
846871

847872
return this;
848873
}
@@ -1405,6 +1430,26 @@ public Builder priorityPolicy(PriorityPolicy policy) {
14051430
return this;
14061431
}
14071432

1433+
/**
1434+
* Sets the priority policy timeout
1435+
* @param priorityTimeout the timeout
1436+
* @return Builder
1437+
*/
1438+
public Builder priorityTimeout(Duration priorityTimeout) {
1439+
this.priorityTimeout = normalize(priorityTimeout);
1440+
return this;
1441+
}
1442+
1443+
/**
1444+
* Sets the priority policy timeout
1445+
* @param priorityTimeoutMillis the timeout in milliseconds
1446+
* @return Builder
1447+
*/
1448+
public Builder priorityTimeout(long priorityTimeoutMillis) {
1449+
this.priorityTimeout = normalizeDuration(priorityTimeoutMillis);
1450+
return this;
1451+
}
1452+
14081453
/**
14091454
* Builds the ConsumerConfiguration
14101455
* @return The consumer configuration.

src/main/java/io/nats/client/api/ConsumerInfo.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.time.Duration;
2323
import java.time.ZonedDateTime;
24+
import java.util.List;
2425

2526
import static io.nats.client.support.ApiConstants.*;
2627
import static io.nats.client.support.JsonValueUtils.*;
@@ -46,6 +47,7 @@ public class ConsumerInfo extends ApiResponse<ConsumerInfo> {
4647
private final ClusterInfo clusterInfo;
4748
private final boolean pushBound;
4849
private final ZonedDateTime timestamp;
50+
private final List<PriorityGroupState> priorityGroupStates;
4951

5052
public ConsumerInfo(Message msg) {
5153
this(parseMessage(msg));
@@ -69,6 +71,7 @@ public ConsumerInfo(JsonValue vConsumerInfo) {
6971
clusterInfo = null;
7072
pushBound = false;
7173
timestamp = null;
74+
priorityGroupStates = null;
7275
}
7376
else {
7477
JsonValue jvConfig = nullValueIsError(jv, CONFIG, JsonValue.EMPTY_MAP) ;
@@ -92,6 +95,8 @@ public ConsumerInfo(JsonValue vConsumerInfo) {
9295
pushBound = readBoolean(jv, PUSH_BOUND);
9396

9497
timestamp = readDate(jv, TIMESTAMP);
98+
99+
priorityGroupStates = PriorityGroupState.optionalListOf(readObject(jv, PRIORITY_GROUPS));
95100
}
96101
}
97102

@@ -224,6 +229,15 @@ public ZonedDateTime getTimestamp() {
224229
return timestamp;
225230
}
226231

232+
/**
233+
* The state of Priority Groups
234+
* @return the list of Priority Groups, may be null
235+
*/
236+
@Nullable
237+
public List<PriorityGroupState> getPriorityGroupStates() {
238+
return priorityGroupStates;
239+
}
240+
227241
/**
228242
* A way to more accurately calculate pending during the initial state
229243
* of the consumer when messages may be unaccounted for in flight

0 commit comments

Comments
 (0)